diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go index e5b13862664..d1d2ea77d88 100644 --- a/cmd/kube-controller-manager/app/apps.go +++ b/cmd/kube-controller-manager/app/apps.go @@ -29,11 +29,10 @@ func startStatefulSetController(ctx ControllerContext) (bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] { return false, nil } - resyncPeriod := ResyncPeriod(&ctx.Options)() go statefulset.NewStatefulSetController( - ctx.InformerFactory.Pods().Informer(), + ctx.NewInformerFactory.Core().V1().Pods(), + ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(), ctx.ClientBuilder.ClientOrDie("statefulset-controller"), - resyncPeriod, ).Run(1, ctx.Stop) return true, nil } diff --git a/pkg/controller/statefulset/BUILD b/pkg/controller/statefulset/BUILD index dc07b2dd6f6..60f48852fe9 100644 --- a/pkg/controller/statefulset/BUILD +++ b/pkg/controller/statefulset/BUILD @@ -23,17 +23,20 @@ go_library( "//pkg/api/v1/pod:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", + "//pkg/client/listers/apps/v1beta1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", + "//pkg/client/retry:go_default_library", "//pkg/controller:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", + "//vendor:k8s.io/client-go/pkg/api", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", @@ -56,17 +59,22 @@ go_test( "//pkg/api/v1/pod:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/externalversions:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library", + "//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library", "//pkg/client/legacylisters:go_default_library", + "//pkg/client/listers/apps/v1beta1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/controller:go_default_library", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", - "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index 00b66c2ecdb..01c7584355e 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -23,11 +23,15 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" errorutils "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/pkg/api" "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" + "k8s.io/kubernetes/pkg/client/retry" ) // StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods, @@ -52,15 +56,17 @@ type StatefulPodControlInterface interface { UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error } -func NewRealStatefulPodControl(client clientset.Interface, recorder record.EventRecorder) StatefulPodControlInterface { - return &realStatefulPodControl{client, recorder} +func NewRealStatefulPodControl(client clientset.Interface, setLister appslisters.StatefulSetLister, podLister corelisters.PodLister, recorder record.EventRecorder) StatefulPodControlInterface { + return &realStatefulPodControl{client, setLister, podLister, recorder} } // realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the // API server. The struct is package private as the internal details are irrelevant to importing packages. type realStatefulPodControl struct { - client clientset.Interface - recorder record.EventRecorder + client clientset.Interface + setLister appslisters.StatefulSetLister + podLister corelisters.PodLister + recorder record.EventRecorder } func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { @@ -80,54 +86,56 @@ func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod } func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { - // we make a copy of the Pod on the stack and mutate the copy - // we copy back to pod to notify the caller of successful mutation - obj, err := api.Scheme.Copy(pod) - if err != nil { - return fmt.Errorf("unable to copy pod: %v", err) - } - podCopy := obj.(*v1.Pod) - for attempt := 0; attempt < maxUpdateRetries; attempt++ { + attemptedUpdate := false + + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // assume the Pod is consistent consistent := true - // if the Pod does not conform to it's identity, update the identity and dirty the Pod - if !identityMatches(set, podCopy) { - updateIdentity(set, podCopy) + // if the Pod does not conform to its identity, update the identity and dirty the Pod + if !identityMatches(set, pod) { + updateIdentity(set, pod) consistent = false } // if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's, // dirty the Pod, and create any missing PVCs - if !storageMatches(set, podCopy) { - updateStorage(set, podCopy) + if !storageMatches(set, pod) { + updateStorage(set, pod) consistent = false - if err := spc.createPersistentVolumeClaims(set, podCopy); err != nil { + if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } } // if the Pod is not dirty do nothing if consistent { - *pod = *podCopy return nil } + + attemptedUpdate = true // commit the update, retrying on conflicts - _, err = spc.client.Core().Pods(set.Namespace).Update(podCopy) - if !apierrors.IsConflict(err) { - if err == nil { - *pod = *podCopy + _, err := spc.client.Core().Pods(set.Namespace).Update(pod) + if err == nil { + return nil + } + updateErr := err + + if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil { + // make a copy so we don't mutate the shared cache + if copy, err := api.Scheme.DeepCopy(updated); err == nil { + pod = copy.(*v1.Pod) + } else { + utilruntime.HandleError(fmt.Errorf("error copying updated Pod: %v", err)) } - spc.recordPodEvent("update", set, pod, err) - return err + } else { + utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err)) } - conflicting, err := spc.client.Core().Pods(set.Namespace).Get(podCopy.Name, metav1.GetOptions{}) - if err != nil { - spc.recordPodEvent("update", set, podCopy, err) - return err - } - *podCopy = *conflicting + + return updateErr + }) + if attemptedUpdate { + spc.recordPodEvent("update", set, pod, err) } - spc.recordPodEvent("update", set, pod, updateConflictError) - return updateConflictError + return err } func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { @@ -137,31 +145,28 @@ func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod } func (spc *realStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error { - if set.Status.Replicas == replicas { - return nil - } - obj, err := api.Scheme.Copy(set) - if err != nil { - return fmt.Errorf("unable to copy set: %v", err) - } - setCopy := obj.(*apps.StatefulSet) - setCopy.Status.Replicas = replicas - for attempt := 0; attempt < maxUpdateRetries; attempt++ { - _, err := spc.client.Apps().StatefulSets(setCopy.Namespace).UpdateStatus(setCopy) - if !apierrors.IsConflict(err) { - if err == nil { - *set = *setCopy + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + set.Status.Replicas = replicas + _, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set) + if err == nil { + return nil + } + + updateErr := err + + if updated, err := spc.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil { + // make a copy so we don't mutate the shared cache + if copy, err := api.Scheme.DeepCopy(updated); err == nil { + set = copy.(*apps.StatefulSet) + } else { + utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err)) } - return err + } else { + utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err)) } - conflicting, err := spc.client.Apps().StatefulSets(setCopy.Namespace).Get(setCopy.Name, metav1.GetOptions{}) - if err != nil { - return err - } - conflicting.Status.Replicas = setCopy.Status.Replicas - *setCopy = *conflicting - } - return updateConflictError + + return updateErr + }) } // recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will diff --git a/pkg/controller/statefulset/stateful_pod_control_test.go b/pkg/controller/statefulset/stateful_pod_control_test.go index a1f9206c16a..8e3e9b95ee7 100644 --- a/pkg/controller/statefulset/stateful_pod_control_test.go +++ b/pkg/controller/statefulset/stateful_pod_control_test.go @@ -25,10 +25,14 @@ import ( "k8s.io/apimachinery/pkg/runtime" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/v1" + podapi "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" ) func TestStatefulPodControlCreatesPods(t *testing.T) { @@ -36,7 +40,7 @@ func TestStatefulPodControlCreatesPods(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource) }) @@ -67,7 +71,7 @@ func TestStatefulPodControlCreatePodExists(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) pvcs := getPersistentVolumeClaims(set, pod) fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { claim := pvcs[action.GetResource().GroupResource().Resource] @@ -80,7 +84,6 @@ func TestStatefulPodControlCreatePodExists(t *testing.T) { fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { return true, pod, apierrors.NewAlreadyExists(action.GetResource().GroupResource(), pod.Name) }) - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.CreateStatefulPod(set, pod); !apierrors.IsAlreadyExists(err) { t.Errorf("Failed to create Pod error: %s", err) } @@ -98,7 +101,7 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource) }) @@ -109,7 +112,6 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) { create := action.(core.CreateAction) return true, create.GetObject(), nil }) - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.CreateStatefulPod(set, pod); err == nil { t.Error("Failed to produce error on PVC creation failure") } @@ -129,7 +131,7 @@ func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) @@ -140,7 +142,6 @@ func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) { create := action.(core.CreateAction) return true, create.GetObject(), nil }) - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.CreateStatefulPod(set, pod); err == nil { t.Error("Failed to produce error on PVC creation failure") } @@ -160,7 +161,7 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource) }) @@ -171,7 +172,6 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) { fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.CreateStatefulPod(set, pod); err == nil { t.Error("Failed to produce error on Pod creation failure") } @@ -192,7 +192,7 @@ func TestStatefulPodControlNoOpUpdate(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { t.Error("no-op update should not make any client invocation") return true, nil, apierrors.NewInternalError(errors.New("If we are here we have a problem")) @@ -210,14 +210,15 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) { recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) - fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) - fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { + fakeClient := fake.NewSimpleClientset(set, pod) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) + var updated *v1.Pod + fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { update := action.(core.UpdateAction) + updated = update.GetObject().(*v1.Pod) return true, update.GetObject(), nil }) pod.Name = "goo-0" - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.UpdateStatefulPod(set, pod); err != nil { t.Errorf("Successful update returned an error: %s", err) } @@ -227,7 +228,7 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) { } else if !strings.Contains(events[0], v1.EventTypeNormal) { t.Errorf("Expected normal event found %s", events[0]) } - if !identityMatches(set, pod) { + if !identityMatches(set, updated) { t.Error("Name update failed identity does not match") } } @@ -237,14 +238,19 @@ func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + gooPod := newStatefulSetPod(set, 0) + gooPod.Name = "goo-0" + indexer.Add(gooPod) + podLister := corelisters.NewPodLister(indexer) + control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder) fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { + pod.Name = "goo-0" return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) pod.Name = "goo-0" - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.UpdateStatefulPod(set, pod); err == nil { - t.Error("Falied update does not generate an error") + t.Error("Failed update does not generate an error") } events := collectEvents(recorder.Events) if eventCount := len(events); eventCount != 1 { @@ -262,7 +268,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) pvcs := getPersistentVolumeClaims(set, pod) volumes := make([]v1.Volume, len(pod.Spec.Volumes)) for i := range pod.Spec.Volumes { @@ -282,7 +288,12 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) { update := action.(core.UpdateAction) return true, update.GetObject(), nil }) - control = NewRealStatefulPodControl(fakeClient, recorder) + var updated *v1.Pod + fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + updated = update.GetObject().(*v1.Pod) + return true, update.GetObject(), nil + }) if err := control.UpdateStatefulPod(set, pod); err != nil { t.Errorf("Successful update returned an error: %s", err) } @@ -295,7 +306,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) { t.Errorf("Expected normal event found %s", events[i]) } } - if !storageMatches(set, pod) { + if !storageMatches(set, updated) { t.Error("Name update failed identity does not match") } } @@ -305,7 +316,7 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) pvcs := getPersistentVolumeClaims(set, pod) volumes := make([]v1.Volume, len(pod.Spec.Volumes)) for i := range pod.Spec.Volumes { @@ -324,7 +335,6 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) { fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.UpdateStatefulPod(set, pod); err == nil { t.Error("Failed Pod storage update did not return an error") } @@ -337,9 +347,6 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) { t.Errorf("Expected normal event found %s", events[i]) } } - if storageMatches(set, pod) { - t.Error("Storag matches on failed update") - } } func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) { @@ -347,24 +354,23 @@ func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) - attempts := 0 + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + gooPod := newStatefulSetPod(set, 0) + gooPod.Name = "goo-0" + indexer.Add(gooPod) + podLister := corelisters.NewPodLister(indexer) + control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder) + conflict := false fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { update := action.(core.UpdateAction) - if attempts < maxUpdateRetries/2 { - attempts++ + if !conflict { + conflict = true return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict")) } else { return true, update.GetObject(), nil } }) - fakeClient.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) { - pod.Name = "goo-0" - return true, pod, nil - - }) pod.Name = "goo-0" - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.UpdateStatefulPod(set, pod); err != nil { t.Errorf("Successful update returned an error: %s", err) } @@ -384,20 +390,20 @@ func TestStatefulPodControlUpdatePodConflictFailure(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + updatedPod := newStatefulSetPod(set, 0) + updatedPod.Annotations[podapi.PodHostnameAnnotation] = "wrong" + indexer.Add(updatedPod) + podLister := corelisters.NewPodLister(indexer) + control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder) fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { update := action.(core.UpdateAction) return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict")) - }) - fakeClient.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) { - return true, nil, apierrors.NewInternalError(errors.New("API server down")) - }) pod.Name = "goo-0" - control = NewRealStatefulPodControl(fakeClient, recorder) if err := control.UpdateStatefulPod(set, pod); err == nil { - t.Error("Falied update did not reaturn an error") + t.Error("Failed update did not return an error") } events := collectEvents(recorder.Events) if eventCount := len(events); eventCount != 1 { @@ -405,41 +411,6 @@ func TestStatefulPodControlUpdatePodConflictFailure(t *testing.T) { } else if !strings.Contains(events[0], v1.EventTypeWarning) { t.Errorf("Expected normal event found %s", events[0]) } - if identityMatches(set, pod) { - t.Error("Identity matches on failed update") - } -} - -func TestStatefulPodControlUpdatePodConflictMaxRetries(t *testing.T) { - recorder := record.NewFakeRecorder(10) - set := newStatefulSet(3) - pod := newStatefulSetPod(set, 0) - fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) - fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { - update := action.(core.UpdateAction) - return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict")) - - }) - fakeClient.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) { - pod.Name = "goo-0" - return true, pod, nil - - }) - pod.Name = "goo-0" - control = NewRealStatefulPodControl(fakeClient, recorder) - if err := control.UpdateStatefulPod(set, pod); err == nil { - t.Error("Falied update did not reaturn an error") - } - events := collectEvents(recorder.Events) - if eventCount := len(events); eventCount != 1 { - t.Errorf("Expected 1 event for failed Pod update found %d", eventCount) - } else if !strings.Contains(events[0], v1.EventTypeWarning) { - t.Errorf("Expected normal event found %s", events[0]) - } - if identityMatches(set, pod) { - t.Error("Identity matches on failed update") - } } func TestStatefulPodControlDeletesStatefulPod(t *testing.T) { @@ -447,7 +418,7 @@ func TestStatefulPodControlDeletesStatefulPod(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) { return true, nil, nil }) @@ -467,7 +438,7 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) { set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) @@ -486,7 +457,7 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) { recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder) fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { update := action.(core.UpdateAction) return true, update.GetObject(), nil @@ -506,18 +477,17 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) { func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) { recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) - replicas := set.Status.Replicas fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + indexer.Add(set) + setLister := appslisters.NewStatefulSetLister(indexer) + control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder) fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) if err := control.UpdateStatefulSetReplicas(set, 2); err == nil { t.Error("Failed update did not return error") } - if set.Status.Replicas != replicas { - t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", replicas) - } events := collectEvents(recorder.Events) if eventCount := len(events); eventCount != 0 { t.Errorf("Expected 0 events for successful status update %d", eventCount) @@ -527,22 +497,21 @@ func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) { func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) { recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) - attempts := 0 + conflict := false fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + indexer.Add(set) + setLister := appslisters.NewStatefulSetLister(indexer) + control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder) fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { - update := action.(core.UpdateAction) - if attempts < maxUpdateRetries/2 { - attempts++ + if !conflict { + conflict = true return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists")) } else { return true, update.GetObject(), nil } }) - fakeClient.AddReactor("get", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { - return true, set, nil - }) if err := control.UpdateStatefulSetReplicas(set, 2); err != nil { t.Errorf("UpdateStatefulSetStatus returned an error: %s", err) } @@ -558,46 +527,18 @@ func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) { func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) { recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) - replicas := set.Status.Replicas fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + indexer.Add(set) + setLister := appslisters.NewStatefulSetLister(indexer) + control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder) fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { update := action.(core.UpdateAction) return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists")) }) - fakeClient.AddReactor("get", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { - return true, nil, apierrors.NewInternalError(errors.New("API server down")) - }) if err := control.UpdateStatefulSetReplicas(set, 2); err == nil { t.Error("UpdateStatefulSetStatus failed to return an error on get failure") } - if set.Status.Replicas != replicas { - t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas) - } - events := collectEvents(recorder.Events) - if eventCount := len(events); eventCount != 0 { - t.Errorf("Expected 0 events for successful status update %d", eventCount) - } -} - -func TestStatefulPodControlUpdateReplicasConflictMaxRetries(t *testing.T) { - recorder := record.NewFakeRecorder(10) - set := newStatefulSet(3) - replicas := set.Status.Replicas - fakeClient := &fake.Clientset{} - control := NewRealStatefulPodControl(fakeClient, recorder) - fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { - return true, newStatefulSet(3), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists")) - }) - fakeClient.AddReactor("get", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { - return true, newStatefulSet(3), nil - }) - if err := control.UpdateStatefulSetReplicas(set, 2); err == nil { - t.Error("UpdateStatefulSetStatus failure did not return an error ") - } - if set.Status.Replicas != replicas { - t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas) - } events := collectEvents(recorder.Events) if eventCount := len(events); eventCount != 0 { t.Errorf("Expected 0 events for successful status update %d", eventCount) diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index 5041529c8fd..102e3c49bdf 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -22,23 +22,23 @@ import ( "sort" "time" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" - - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/util/workqueue" + appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" + appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/controller" "github.com/golang/glog" @@ -56,20 +56,24 @@ type StatefulSetController struct { // control returns an interface capable of syncing a stateful set. // Abstracted out for testing. control StatefulSetControlInterface - // podStore is a cache of watched pods. - podStore listers.StoreToPodLister - // podStoreSynced returns true if the pod store has synced at least once. - podStoreSynced cache.InformerSynced - // A store of StatefulSets, populated by setController. - setStore listers.StoreToStatefulSetLister - // Watches changes to all StatefulSets. - setController cache.Controller + // podLister is able to list/get pods from a shared informer's store + podLister corelisters.PodLister + // podListerSynced returns true if the pod shared informer has synced at least once + podListerSynced cache.InformerSynced + // setLister is able to list/get stateful sets from a shared informer's store + setLister appslisters.StatefulSetLister + // setListerSynced returns true if the stateful set shared informer has synced at least once + setListerSynced cache.InformerSynced // StatefulSets that need to be synced. queue workqueue.RateLimitingInterface } // NewStatefulSetController creates a new statefulset controller. -func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod time.Duration) *StatefulSetController { +func NewStatefulSetController( + podInformer coreinformers.PodInformer, + setInformer appsinformers.StatefulSetInformer, + kubeClient clientset.Interface, +) *StatefulSetController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) @@ -77,11 +81,11 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient ssc := &StatefulSetController{ kubeClient: kubeClient, - control: NewDefaultStatefulSetControl(NewRealStatefulPodControl(kubeClient, recorder)), + control: NewDefaultStatefulSetControl(NewRealStatefulPodControl(kubeClient, setInformer.Lister(), podInformer.Lister(), recorder)), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), } - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ // lookup the statefulset and enqueue AddFunc: ssc.addPod, // lookup current and old statefulset if labels changed @@ -89,19 +93,10 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient // lookup statefulset accounting for deletion tombstones DeleteFunc: ssc.deletePod, }) - ssc.podStore.Indexer = podInformer.GetIndexer() + ssc.podLister = podInformer.Lister() + ssc.podListerSynced = podInformer.Informer().HasSynced - ssc.setStore.Store, ssc.setController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return ssc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return ssc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).Watch(options) - }, - }, - &apps.StatefulSet{}, - statefulSetResyncPeriod, + setInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: ssc.enqueueStatefulSet, UpdateFunc: func(old, cur interface{}) { @@ -114,9 +109,12 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient }, DeleteFunc: ssc.enqueueStatefulSet, }, + statefulSetResyncPeriod, ) + ssc.setLister = setInformer.Lister() + ssc.setListerSynced = setInformer.Informer().HasSynced + // TODO: Watch volumes - ssc.podStoreSynced = podInformer.GetController().HasSynced return ssc } @@ -124,18 +122,20 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer ssc.queue.ShutDown() + glog.Infof("Starting statefulset controller") - if !cache.WaitForCacheSync(stopCh, ssc.podStoreSynced) { + + if !cache.WaitForCacheSync(stopCh, ssc.podListerSynced, ssc.setListerSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) return } - go ssc.setController.Run(stopCh) + for i := 0; i < workers; i++ { go wait.Until(ssc.worker, time.Second, stopCh) } + <-stopCh glog.Infof("Shutting down statefulset controller") - } // addPod adds the statefulset for the pod to the sync queue @@ -204,12 +204,12 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) ( if err != nil { return []*v1.Pod{}, err } - return ssc.podStore.Pods(set.Namespace).List(sel) + return ssc.podLister.Pods(set.Namespace).List(sel) } // getStatefulSetForPod returns the StatefulSet managing the given pod. func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet { - sets, err := ssc.setStore.GetPodStatefulSets(pod) + sets, err := ssc.setLister.GetPodStatefulSets(pod) if err != nil { glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) return nil @@ -225,14 +225,14 @@ func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.Statef sort.Sort(overlappingStatefulSets(sets)) // return the first created set for which pod is a member for i := range sets { - if isMemberOf(&sets[i], pod) { - return &sets[i] + if isMemberOf(sets[i], pod) { + return sets[i] } } glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) return nil } - return &sets[0] + return sets[0] } @@ -276,8 +276,12 @@ func (ssc *StatefulSetController) sync(key string) error { glog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := ssc.setStore.Store.GetByKey(key) - if !exists { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + set, err := ssc.setLister.StatefulSets(namespace).Get(name) + if errors.IsNotFound(err) { glog.Infof("StatefulSet has been deleted %v", key) return nil } @@ -286,13 +290,12 @@ func (ssc *StatefulSetController) sync(key string) error { return err } - set := *obj.(*apps.StatefulSet) - pods, err := ssc.getPodsForStatefulSet(&set) + pods, err := ssc.getPodsForStatefulSet(set) if err != nil { return err } - return ssc.syncStatefulSet(&set, pods) + return ssc.syncStatefulSet(set, pods) } // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod). diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 0bf74291d1f..dcac5e3ecbf 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -17,8 +17,10 @@ limitations under the License. package statefulset import ( + "fmt" "sort" + "k8s.io/client-go/pkg/api" "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" @@ -97,8 +99,16 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p sort.Sort(ascendingOrdinal(condemned)) // if the current number of replicas has changed update the statefulSets replicas - if err := ssc.podControl.UpdateStatefulSetReplicas(set, int32(ready)); err != nil { - return err + if set.Status.Replicas != int32(ready) { + obj, err := api.Scheme.Copy(set) + if err != nil { + return fmt.Errorf("unable to copy set: %v", err) + } + set = obj.(*apps.StatefulSet) + + if err := ssc.podControl.UpdateStatefulSetReplicas(set, int32(ready)); err != nil { + return err + } } // Examine each replica with respect to its ordinal @@ -123,8 +133,17 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p set.Name, replicas[i].Name) return nil } - // Enforce the StatefulSet invariants, - if err := ssc.podControl.UpdateStatefulPod(set, replicas[i]); err != nil { + // Enforce the StatefulSet invariants + if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) { + continue + } + // Make a deep copy so we don't mutate the shared cache + copy, err := api.Scheme.DeepCopy(replicas[i]) + if err != nil { + return err + } + replica := copy.(*v1.Pod) + if err := ssc.podControl.UpdateStatefulPod(set, replica); err != nil { return err } } diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 447f9777c10..190ea0054db 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -26,31 +26,67 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" + appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1" listers "k8s.io/kubernetes/pkg/client/legacylisters" + appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/controller" ) func TestDefaultStatefulSetControlCreatesPods(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if set.Status.Replicas != 3 { - t.Error("Falied to scale statefulset to 3 replicas") + t.Error("Failed to scale statefulset to 3 replicas") } } func TestStatefulSetControlScaleUp(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } @@ -58,15 +94,33 @@ func TestStatefulSetControlScaleUp(t *testing.T) { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to scale StatefulSet : %s", err) } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if set.Status.Replicas != 4 { - t.Error("Falied to scale statefulset to 4 replicas") + t.Error("Failed to scale statefulset to 4 replicas") } } func TestStatefulSetControlScaleDown(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } @@ -75,19 +129,37 @@ func TestStatefulSetControlScaleDown(t *testing.T) { t.Errorf("Failed to scale StatefulSet : %s", err) } if set.Status.Replicas != 0 { - t.Error("Falied to scale statefulset to 4 replicas") + t.Error("Failed to scale statefulset to 0 replicas") } } func TestStatefulSetControlReplacesPods(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(5) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if set.Status.Replicas != 5 { - t.Error("Falied to scale statefulset to 5 replicas") + t.Error("Failed to scale statefulset to 5 replicas") } selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { @@ -109,12 +181,20 @@ func TestStatefulSetControlReplacesPods(t *testing.T) { if err = ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if pods, err = spc.setPodRunning(set, i); err != nil { t.Error(err) } if err = ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if pods, err = spc.setPodReady(set, i); err != nil { t.Error(err) } @@ -126,13 +206,19 @@ func TestStatefulSetControlReplacesPods(t *testing.T) { if err := ssc.UpdateStatefulSet(set, pods); err != nil { t.Errorf("Failed to update StatefulSet : %s", err) } - if set.Status.Replicas != 5 { - t.Error("Falied to scale StatefulSet to 5 replicas") + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + if e, a := int32(5), set.Status.Replicas; e != a { + t.Errorf("Expected to scale to %d, got %d", e, a) } } func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { - spc := newFakeStatefulPodControl() + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) @@ -171,9 +257,22 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { } func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { t.Error(err) @@ -213,63 +312,149 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if int(set.Status.Replicas) != 3 { t.Errorf("StatefulSetControl does not unblock on %s=true", apps.StatefulSetInitAnnotation) } } func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { - t.Errorf("StatefulSetControl did not return InternalError foudn %s", err) + t.Errorf("StatefulSetControl did not return InternalError found %s", err) } if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if set.Status.Replicas != 3 { - t.Error("Falied to scale StatefulSet to 3 replicas") + t.Error("Failed to scale StatefulSet to 3 replicas") } } func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) - spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) - if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { - t.Errorf("StatefulSetControl did not return InternalError foudn %s", err) - } + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + + // have to have 1 successful loop first if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { - t.Errorf("Failed to turn up StatefulSet : %s", err) + t.Fatalf("Unexpected error: %v", err) + } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) } if set.Status.Replicas != 3 { - t.Error("Falied to scale StatefulSet to 3 replicas") + t.Error("Failed to scale StatefulSet to 3 replicas") + } + + // now mutate a pod's identity + pods, err := spc.podsLister.List(labels.Everything()) + if err != nil { + t.Fatalf("Error listing pods: %v", err) + } + if len(pods) != 3 { + t.Fatalf("Expected 3 pods, got %d", len(pods)) + } + sort.Sort(ascendingOrdinal(pods)) + pods[0].Name = "goo-0" + spc.podsIndexer.Update(pods[0]) + + // now it should fail + if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { + t.Errorf("StatefulSetControl did not return InternalError found %s", err) } } func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) spc.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { - t.Errorf("StatefulSetControl did not return InternalError foudn %s", err) + t.Errorf("StatefulSetControl did not return InternalError found %s", err) } if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if set.Status.Replicas != 3 { - t.Error("Falied to scale StatefulSet to 3 replicas") + t.Error("Failed to scale StatefulSet to 3 replicas") } } func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { t.Error(err) @@ -313,22 +498,44 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) { } func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { - spc := newFakeStatefulPodControl() - ssc := NewDefaultStatefulSetControl(spc) set := newStatefulSet(3) + client := fake.NewSimpleClientset(set) + + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewDefaultStatefulSetControl(spc) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + cache.WaitForCacheSync( + stop, + informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, + informerFactory.Core().V1().Pods().Informer().HasSynced, + ) + if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } + var err error + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } *set.Spec.Replicas = 0 spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) if err := scaleDownStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { - t.Errorf("StatefulSetControl failed to throw error on delte %s", err) + t.Errorf("StatefulSetControl failed to throw error on delete %s", err) } if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil { t.Errorf("Failed to turn down StatefulSet %s", err) } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } if set.Status.Replicas != 0 { - t.Error("Falied to scale statefulset to 4 replicas") + t.Error("Failed to scale statefulset to 0 replicas") } } @@ -352,9 +559,9 @@ func (rt *requestTracker) reset() { } type fakeStatefulPodControl struct { - podsLister listers.StoreToPodLister + podsLister corelisters.PodLister claimsLister listers.StoreToPersistentVolumeClaimLister - setsLister listers.StoreToStatefulSetLister + setsLister appslisters.StatefulSetLister podsIndexer cache.Indexer claimsIndexer cache.Indexer setsIndexer cache.Indexer @@ -364,20 +571,16 @@ type fakeStatefulPodControl struct { updateStatusTracker requestTracker } -func newFakeStatefulPodControl() *fakeStatefulPodControl { - podsIndexer := cache.NewIndexer(controller.KeyFunc, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) +func newFakeStatefulPodControl(podInformer coreinformers.PodInformer, setInformer appsinformers.StatefulSetInformer) *fakeStatefulPodControl { claimsIndexer := cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - setsIndexer := cache.NewIndexer(controller.KeyFunc, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) return &fakeStatefulPodControl{ - listers.StoreToPodLister{Indexer: podsIndexer}, + podInformer.Lister(), listers.StoreToPersistentVolumeClaimLister{Indexer: claimsIndexer}, - listers.StoreToStatefulSetLister{Store: setsIndexer}, - podsIndexer, + setInformer.Lister(), + podInformer.Informer().GetIndexer(), claimsIndexer, - setsIndexer, + setInformer.Informer().GetIndexer(), requestTracker{0, nil, 0}, requestTracker{0, nil, 0}, requestTracker{0, nil, 0}, @@ -654,12 +857,20 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInte if err = ssc.UpdateStatefulSet(set, pods); err != nil { return err } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } if pods, err = spc.setPodRunning(set, ord); err != nil { return err } if err = ssc.UpdateStatefulSet(set, pods); err != nil { return err } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } if pods, err = spc.setPodReady(set, ord); err != nil { return err } @@ -667,6 +878,10 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInte if err := ssc.UpdateStatefulSet(set, pods); err != nil { return err } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } if err := assertInvariants(set, spc); err != nil { return err } @@ -688,12 +903,20 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn if err := ssc.UpdateStatefulSet(set, pods); err != nil { return err } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } if pods, err = spc.addTerminatedPod(set, ordinal); err != nil { return err } if err = ssc.UpdateStatefulSet(set, pods); err != nil { return err } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } pods, err = spc.podsLister.Pods(set.Namespace).List(selector) if err != nil { return err @@ -704,6 +927,10 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn if err := ssc.UpdateStatefulSet(set, pods); err != nil { return err } + set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } if err := assertInvariants(set, spc); err != nil { return err } diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 1d126f67eee..333e00f1597 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -24,13 +24,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/controller" ) +func alwaysReady() bool { return true } + func TestStatefulSetControllerCreates(t *testing.T) { ssc, spc := newFakeStatefulSetController() set := newStatefulSet(3) @@ -343,15 +346,18 @@ func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) { } func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodControl) { - fpc := newFakeStatefulPodControl() - ssc := &StatefulSetController{ - kubeClient: nil, - podStoreSynced: func() bool { return true }, - setStore: fpc.setsLister, - podStore: fpc.podsLister, - control: NewDefaultStatefulSetControl(fpc), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), - } + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + fpc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) + ssc := NewStatefulSetController( + informerFactory.Core().V1().Pods(), + informerFactory.Apps().V1beta1().StatefulSets(), + client, + ) + ssc.podListerSynced = alwaysReady + ssc.setListerSynced = alwaysReady + ssc.control = NewDefaultStatefulSetControl(fpc) + return ssc, fpc } diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index c8c3ec1ed90..69e13c6c4b8 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -38,7 +38,7 @@ var updateConflictError = fmt.Errorf("aborting update after %d attempts", maxUpd // overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker. // Generally used to tie break between StatefulSets that have overlapping selectors. -type overlappingStatefulSets []apps.StatefulSet +type overlappingStatefulSets []*apps.StatefulSet func (o overlappingStatefulSets) Len() int { return len(o) } diff --git a/pkg/controller/statefulset/stateful_set_utils_test.go b/pkg/controller/statefulset/stateful_set_utils_test.go index 2b340752484..b6e80473985 100644 --- a/pkg/controller/statefulset/stateful_set_utils_test.go +++ b/pkg/controller/statefulset/stateful_set_utils_test.go @@ -251,10 +251,10 @@ func TestAscendingOrdinal(t *testing.T) { } func TestOverlappingStatefulSets(t *testing.T) { - sets := make([]apps.StatefulSet, 10) + sets := make([]*apps.StatefulSet, 10) perm := rand.Perm(10) for i, v := range perm { - sets[i] = *newStatefulSet(10) + sets[i] = newStatefulSet(10) sets[i].CreationTimestamp = metav1.NewTime(sets[i].CreationTimestamp.Add(time.Duration(v) * time.Second)) } sort.Sort(overlappingStatefulSets(sets)) @@ -262,7 +262,7 @@ func TestOverlappingStatefulSets(t *testing.T) { t.Error("ascendingOrdinal fails to sort Pods") } for i, v := range perm { - sets[i] = *newStatefulSet(10) + sets[i] = newStatefulSet(10) sets[i].Name = strconv.FormatInt(int64(v), 10) } sort.Sort(overlappingStatefulSets(sets)) diff --git a/test/e2e/examples.go b/test/e2e/examples.go index 79cced8bb80..b86eda7c5e1 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -264,20 +264,19 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { framework.KubeDescribe("CassandraStatefulSet", func() { It("should create statefulset", func() { mkpath := func(file string) string { - return filepath.Join(framework.TestContext.RepoRoot, "examples/storage/cassandra", file) + return filepath.Join("examples/storage/cassandra", file) } serviceYaml := mkpath("cassandra-service.yaml") nsFlag := fmt.Sprintf("--namespace=%v", ns) // have to change dns prefix because of the dynamic namespace - input, err := ioutil.ReadFile(mkpath("cassandra-statefulset.yaml")) - Expect(err).NotTo(HaveOccurred()) + input := generated.ReadOrDie(mkpath("cassandra-statefulset.yaml")) output := strings.Replace(string(input), "cassandra-0.cassandra.default.svc.cluster.local", "cassandra-0.cassandra."+ns+".svc.cluster.local", -1) statefulsetYaml := "/tmp/cassandra-statefulset.yaml" - err = ioutil.WriteFile(statefulsetYaml, []byte(output), 0644) + err := ioutil.WriteFile(statefulsetYaml, []byte(output), 0644) Expect(err).NotTo(HaveOccurred()) By("Starting the cassandra service") diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 44a6610a181..74af7cf7d83 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -71,6 +71,7 @@ go_library( "//pkg/util/version:go_default_library", "//plugin/pkg/scheduler/algorithm/predicates:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", + "//test/e2e/generated:go_default_library", "//test/e2e/perftype:go_default_library", "//test/utils:go_default_library", "//vendor:github.com/golang/glog", diff --git a/test/e2e/framework/statefulset_utils.go b/test/e2e/framework/statefulset_utils.go index 3efaa01bc28..e782b5a023d 100644 --- a/test/e2e/framework/statefulset_utils.go +++ b/test/e2e/framework/statefulset_utils.go @@ -18,7 +18,6 @@ package framework import ( "fmt" - "io/ioutil" "path/filepath" "strconv" "strings" @@ -38,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/test/e2e/generated" ) const ( @@ -70,8 +70,7 @@ func CreateStatefulSetService(name string, labels map[string]string) *v1.Service func StatefulSetFromManifest(fileName, ns string) *apps.StatefulSet { var ss apps.StatefulSet Logf("Parsing statefulset from %v", fileName) - data, err := ioutil.ReadFile(fileName) - Expect(err).NotTo(HaveOccurred()) + data := generated.ReadOrDie(fileName) json, err := utilyaml.ToJSON(data) Expect(err).NotTo(HaveOccurred()) @@ -99,7 +98,7 @@ func NewStatefulSetTester(c clientset.Interface) *StatefulSetTester { // CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create. func (s *StatefulSetTester) CreateStatefulSet(manifestPath, ns string) *apps.StatefulSet { mkpath := func(file string) string { - return filepath.Join(TestContext.RepoRoot, manifestPath, file) + return filepath.Join(manifestPath, file) } ss := StatefulSetFromManifest(mkpath("statefulset.yaml"), ns)