Merge pull request #41482 from ncdc/shared-informers-11-statefulset

Automatic merge from submit-queue (batch tested with PRs 41146, 41486, 41482, 41538, 41784)

Switch statefulset controller to shared informers

Originally part of #40097 

I *think* the controller currently makes a deep copy of a StatefulSet before it mutates it, but I'm not 100% sure. For those who are most familiar with this code, could you please confirm?

@beeps @smarterclayton @ingvagabund @sttts @liggitt @deads2k @kubernetes/sig-apps-pr-reviews @kubernetes/sig-scalability-pr-reviews @timothysc @gmarek @wojtek-t
This commit is contained in:
Kubernetes Submit Queue 2017-02-22 21:09:35 -08:00 committed by GitHub
commit 4396f19c61
13 changed files with 520 additions and 313 deletions

View File

@ -29,11 +29,10 @@ func startStatefulSetController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] {
return false, nil return false, nil
} }
resyncPeriod := ResyncPeriod(&ctx.Options)()
go statefulset.NewStatefulSetController( go statefulset.NewStatefulSetController(
ctx.InformerFactory.Pods().Informer(), ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"), ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
resyncPeriod,
).Run(1, ctx.Stop) ).Run(1, ctx.Stop)
return true, nil return true, nil
} }

View File

@ -23,17 +23,20 @@ go_library(
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library", "//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record", "//vendor:k8s.io/client-go/tools/record",
@ -56,17 +59,22 @@ go_test(
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/legacylisters:go_default_library", "//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record", "//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/workqueue",
], ],
) )

View File

@ -23,11 +23,15 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
errorutils "k8s.io/apimachinery/pkg/util/errors" errorutils "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/client/retry"
) )
// StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods, // StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods,
@ -52,15 +56,17 @@ type StatefulPodControlInterface interface {
UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error
} }
func NewRealStatefulPodControl(client clientset.Interface, recorder record.EventRecorder) StatefulPodControlInterface { func NewRealStatefulPodControl(client clientset.Interface, setLister appslisters.StatefulSetLister, podLister corelisters.PodLister, recorder record.EventRecorder) StatefulPodControlInterface {
return &realStatefulPodControl{client, recorder} return &realStatefulPodControl{client, setLister, podLister, recorder}
} }
// realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the // realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
// API server. The struct is package private as the internal details are irrelevant to importing packages. // API server. The struct is package private as the internal details are irrelevant to importing packages.
type realStatefulPodControl struct { type realStatefulPodControl struct {
client clientset.Interface client clientset.Interface
recorder record.EventRecorder setLister appslisters.StatefulSetLister
podLister corelisters.PodLister
recorder record.EventRecorder
} }
func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
@ -80,54 +86,56 @@ func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod
} }
func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
// we make a copy of the Pod on the stack and mutate the copy attemptedUpdate := false
// we copy back to pod to notify the caller of successful mutation
obj, err := api.Scheme.Copy(pod) err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err != nil {
return fmt.Errorf("unable to copy pod: %v", err)
}
podCopy := obj.(*v1.Pod)
for attempt := 0; attempt < maxUpdateRetries; attempt++ {
// assume the Pod is consistent // assume the Pod is consistent
consistent := true consistent := true
// if the Pod does not conform to it's identity, update the identity and dirty the Pod // if the Pod does not conform to its identity, update the identity and dirty the Pod
if !identityMatches(set, podCopy) { if !identityMatches(set, pod) {
updateIdentity(set, podCopy) updateIdentity(set, pod)
consistent = false consistent = false
} }
// if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's, // if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
// dirty the Pod, and create any missing PVCs // dirty the Pod, and create any missing PVCs
if !storageMatches(set, podCopy) { if !storageMatches(set, pod) {
updateStorage(set, podCopy) updateStorage(set, pod)
consistent = false consistent = false
if err := spc.createPersistentVolumeClaims(set, podCopy); err != nil { if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err) spc.recordPodEvent("update", set, pod, err)
return err return err
} }
} }
// if the Pod is not dirty do nothing // if the Pod is not dirty do nothing
if consistent { if consistent {
*pod = *podCopy
return nil return nil
} }
attemptedUpdate = true
// commit the update, retrying on conflicts // commit the update, retrying on conflicts
_, err = spc.client.Core().Pods(set.Namespace).Update(podCopy) _, err := spc.client.Core().Pods(set.Namespace).Update(pod)
if !apierrors.IsConflict(err) { if err == nil {
if err == nil { return nil
*pod = *podCopy }
updateErr := err
if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := api.Scheme.DeepCopy(updated); err == nil {
pod = copy.(*v1.Pod)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated Pod: %v", err))
} }
spc.recordPodEvent("update", set, pod, err) } else {
return err utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err))
} }
conflicting, err := spc.client.Core().Pods(set.Namespace).Get(podCopy.Name, metav1.GetOptions{})
if err != nil { return updateErr
spc.recordPodEvent("update", set, podCopy, err) })
return err if attemptedUpdate {
} spc.recordPodEvent("update", set, pod, err)
*podCopy = *conflicting
} }
spc.recordPodEvent("update", set, pod, updateConflictError) return err
return updateConflictError
} }
func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
@ -137,31 +145,28 @@ func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
} }
func (spc *realStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error { func (spc *realStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error {
if set.Status.Replicas == replicas { return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return nil set.Status.Replicas = replicas
} _, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
obj, err := api.Scheme.Copy(set) if err == nil {
if err != nil { return nil
return fmt.Errorf("unable to copy set: %v", err) }
}
setCopy := obj.(*apps.StatefulSet) updateErr := err
setCopy.Status.Replicas = replicas
for attempt := 0; attempt < maxUpdateRetries; attempt++ { if updated, err := spc.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil {
_, err := spc.client.Apps().StatefulSets(setCopy.Namespace).UpdateStatus(setCopy) // make a copy so we don't mutate the shared cache
if !apierrors.IsConflict(err) { if copy, err := api.Scheme.DeepCopy(updated); err == nil {
if err == nil { set = copy.(*apps.StatefulSet)
*set = *setCopy } else {
utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err))
} }
return err } else {
utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err))
} }
conflicting, err := spc.client.Apps().StatefulSets(setCopy.Namespace).Get(setCopy.Name, metav1.GetOptions{})
if err != nil { return updateErr
return err })
}
conflicting.Status.Replicas = setCopy.Status.Replicas
*setCopy = *conflicting
}
return updateConflictError
} }
// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will // recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will

View File

@ -25,10 +25,14 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
podapi "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
) )
func TestStatefulPodControlCreatesPods(t *testing.T) { func TestStatefulPodControlCreatesPods(t *testing.T) {
@ -36,7 +40,7 @@ func TestStatefulPodControlCreatesPods(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource) return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
}) })
@ -67,7 +71,7 @@ func TestStatefulPodControlCreatePodExists(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
pvcs := getPersistentVolumeClaims(set, pod) pvcs := getPersistentVolumeClaims(set, pod)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
claim := pvcs[action.GetResource().GroupResource().Resource] claim := pvcs[action.GetResource().GroupResource().Resource]
@ -80,7 +84,6 @@ func TestStatefulPodControlCreatePodExists(t *testing.T) {
fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, pod, apierrors.NewAlreadyExists(action.GetResource().GroupResource(), pod.Name) return true, pod, apierrors.NewAlreadyExists(action.GetResource().GroupResource(), pod.Name)
}) })
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.CreateStatefulPod(set, pod); !apierrors.IsAlreadyExists(err) { if err := control.CreateStatefulPod(set, pod); !apierrors.IsAlreadyExists(err) {
t.Errorf("Failed to create Pod error: %s", err) t.Errorf("Failed to create Pod error: %s", err)
} }
@ -98,7 +101,7 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource) return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
}) })
@ -109,7 +112,6 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
create := action.(core.CreateAction) create := action.(core.CreateAction)
return true, create.GetObject(), nil return true, create.GetObject(), nil
}) })
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.CreateStatefulPod(set, pod); err == nil { if err := control.CreateStatefulPod(set, pod); err == nil {
t.Error("Failed to produce error on PVC creation failure") t.Error("Failed to produce error on PVC creation failure")
} }
@ -129,7 +131,7 @@ func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down")) return true, nil, apierrors.NewInternalError(errors.New("API server down"))
}) })
@ -140,7 +142,6 @@ func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) {
create := action.(core.CreateAction) create := action.(core.CreateAction)
return true, create.GetObject(), nil return true, create.GetObject(), nil
}) })
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.CreateStatefulPod(set, pod); err == nil { if err := control.CreateStatefulPod(set, pod); err == nil {
t.Error("Failed to produce error on PVC creation failure") t.Error("Failed to produce error on PVC creation failure")
} }
@ -160,7 +161,7 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource) return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
}) })
@ -171,7 +172,6 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) {
fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down")) return true, nil, apierrors.NewInternalError(errors.New("API server down"))
}) })
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.CreateStatefulPod(set, pod); err == nil { if err := control.CreateStatefulPod(set, pod); err == nil {
t.Error("Failed to produce error on Pod creation failure") t.Error("Failed to produce error on Pod creation failure")
} }
@ -192,7 +192,7 @@ func TestStatefulPodControlNoOpUpdate(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
t.Error("no-op update should not make any client invocation") t.Error("no-op update should not make any client invocation")
return true, nil, apierrors.NewInternalError(errors.New("If we are here we have a problem")) return true, nil, apierrors.NewInternalError(errors.New("If we are here we have a problem"))
@ -210,14 +210,15 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := fake.NewSimpleClientset(set, pod)
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { var updated *v1.Pod
fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction) update := action.(core.UpdateAction)
updated = update.GetObject().(*v1.Pod)
return true, update.GetObject(), nil return true, update.GetObject(), nil
}) })
pod.Name = "goo-0" pod.Name = "goo-0"
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.UpdateStatefulPod(set, pod); err != nil { if err := control.UpdateStatefulPod(set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err) t.Errorf("Successful update returned an error: %s", err)
} }
@ -227,7 +228,7 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
} else if !strings.Contains(events[0], v1.EventTypeNormal) { } else if !strings.Contains(events[0], v1.EventTypeNormal) {
t.Errorf("Expected normal event found %s", events[0]) t.Errorf("Expected normal event found %s", events[0])
} }
if !identityMatches(set, pod) { if !identityMatches(set, updated) {
t.Error("Name update failed identity does not match") t.Error("Name update failed identity does not match")
} }
} }
@ -237,14 +238,19 @@ func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
gooPod := newStatefulSetPod(set, 0)
gooPod.Name = "goo-0"
indexer.Add(gooPod)
podLister := corelisters.NewPodLister(indexer)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder)
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
pod.Name = "goo-0"
return true, nil, apierrors.NewInternalError(errors.New("API server down")) return true, nil, apierrors.NewInternalError(errors.New("API server down"))
}) })
pod.Name = "goo-0" pod.Name = "goo-0"
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.UpdateStatefulPod(set, pod); err == nil { if err := control.UpdateStatefulPod(set, pod); err == nil {
t.Error("Falied update does not generate an error") t.Error("Failed update does not generate an error")
} }
events := collectEvents(recorder.Events) events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 1 { if eventCount := len(events); eventCount != 1 {
@ -262,7 +268,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
pvcs := getPersistentVolumeClaims(set, pod) pvcs := getPersistentVolumeClaims(set, pod)
volumes := make([]v1.Volume, len(pod.Spec.Volumes)) volumes := make([]v1.Volume, len(pod.Spec.Volumes))
for i := range pod.Spec.Volumes { for i := range pod.Spec.Volumes {
@ -282,7 +288,12 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
update := action.(core.UpdateAction) update := action.(core.UpdateAction)
return true, update.GetObject(), nil return true, update.GetObject(), nil
}) })
control = NewRealStatefulPodControl(fakeClient, recorder) var updated *v1.Pod
fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
updated = update.GetObject().(*v1.Pod)
return true, update.GetObject(), nil
})
if err := control.UpdateStatefulPod(set, pod); err != nil { if err := control.UpdateStatefulPod(set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err) t.Errorf("Successful update returned an error: %s", err)
} }
@ -295,7 +306,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
t.Errorf("Expected normal event found %s", events[i]) t.Errorf("Expected normal event found %s", events[i])
} }
} }
if !storageMatches(set, pod) { if !storageMatches(set, updated) {
t.Error("Name update failed identity does not match") t.Error("Name update failed identity does not match")
} }
} }
@ -305,7 +316,7 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
pvcs := getPersistentVolumeClaims(set, pod) pvcs := getPersistentVolumeClaims(set, pod)
volumes := make([]v1.Volume, len(pod.Spec.Volumes)) volumes := make([]v1.Volume, len(pod.Spec.Volumes))
for i := range pod.Spec.Volumes { for i := range pod.Spec.Volumes {
@ -324,7 +335,6 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down")) return true, nil, apierrors.NewInternalError(errors.New("API server down"))
}) })
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.UpdateStatefulPod(set, pod); err == nil { if err := control.UpdateStatefulPod(set, pod); err == nil {
t.Error("Failed Pod storage update did not return an error") t.Error("Failed Pod storage update did not return an error")
} }
@ -337,9 +347,6 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
t.Errorf("Expected normal event found %s", events[i]) t.Errorf("Expected normal event found %s", events[i])
} }
} }
if storageMatches(set, pod) {
t.Error("Storag matches on failed update")
}
} }
func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) { func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
@ -347,24 +354,23 @@ func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
attempts := 0 gooPod := newStatefulSetPod(set, 0)
gooPod.Name = "goo-0"
indexer.Add(gooPod)
podLister := corelisters.NewPodLister(indexer)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder)
conflict := false
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction) update := action.(core.UpdateAction)
if attempts < maxUpdateRetries/2 { if !conflict {
attempts++ conflict = true
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict")) return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict"))
} else { } else {
return true, update.GetObject(), nil return true, update.GetObject(), nil
} }
}) })
fakeClient.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
pod.Name = "goo-0"
return true, pod, nil
})
pod.Name = "goo-0" pod.Name = "goo-0"
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.UpdateStatefulPod(set, pod); err != nil { if err := control.UpdateStatefulPod(set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err) t.Errorf("Successful update returned an error: %s", err)
} }
@ -384,20 +390,20 @@ func TestStatefulPodControlUpdatePodConflictFailure(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
updatedPod := newStatefulSetPod(set, 0)
updatedPod.Annotations[podapi.PodHostnameAnnotation] = "wrong"
indexer.Add(updatedPod)
podLister := corelisters.NewPodLister(indexer)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder)
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction) update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict")) return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict"))
})
fakeClient.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
}) })
pod.Name = "goo-0" pod.Name = "goo-0"
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.UpdateStatefulPod(set, pod); err == nil { if err := control.UpdateStatefulPod(set, pod); err == nil {
t.Error("Falied update did not reaturn an error") t.Error("Failed update did not return an error")
} }
events := collectEvents(recorder.Events) events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 1 { if eventCount := len(events); eventCount != 1 {
@ -405,41 +411,6 @@ func TestStatefulPodControlUpdatePodConflictFailure(t *testing.T) {
} else if !strings.Contains(events[0], v1.EventTypeWarning) { } else if !strings.Contains(events[0], v1.EventTypeWarning) {
t.Errorf("Expected normal event found %s", events[0]) t.Errorf("Expected normal event found %s", events[0])
} }
if identityMatches(set, pod) {
t.Error("Identity matches on failed update")
}
}
func TestStatefulPodControlUpdatePodConflictMaxRetries(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder)
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict"))
})
fakeClient.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
pod.Name = "goo-0"
return true, pod, nil
})
pod.Name = "goo-0"
control = NewRealStatefulPodControl(fakeClient, recorder)
if err := control.UpdateStatefulPod(set, pod); err == nil {
t.Error("Falied update did not reaturn an error")
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 1 {
t.Errorf("Expected 1 event for failed Pod update found %d", eventCount)
} else if !strings.Contains(events[0], v1.EventTypeWarning) {
t.Errorf("Expected normal event found %s", events[0])
}
if identityMatches(set, pod) {
t.Error("Identity matches on failed update")
}
} }
func TestStatefulPodControlDeletesStatefulPod(t *testing.T) { func TestStatefulPodControlDeletesStatefulPod(t *testing.T) {
@ -447,7 +418,7 @@ func TestStatefulPodControlDeletesStatefulPod(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, nil return true, nil, nil
}) })
@ -467,7 +438,7 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) {
set := newStatefulSet(3) set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0) pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down")) return true, nil, apierrors.NewInternalError(errors.New("API server down"))
}) })
@ -486,7 +457,7 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3) set := newStatefulSet(3)
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction) update := action.(core.UpdateAction)
return true, update.GetObject(), nil return true, update.GetObject(), nil
@ -506,18 +477,17 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) { func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3) set := newStatefulSet(3)
replicas := set.Status.Replicas
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down")) return true, nil, apierrors.NewInternalError(errors.New("API server down"))
}) })
if err := control.UpdateStatefulSetReplicas(set, 2); err == nil { if err := control.UpdateStatefulSetReplicas(set, 2); err == nil {
t.Error("Failed update did not return error") t.Error("Failed update did not return error")
} }
if set.Status.Replicas != replicas {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", replicas)
}
events := collectEvents(recorder.Events) events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 { if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount) t.Errorf("Expected 0 events for successful status update %d", eventCount)
@ -527,22 +497,21 @@ func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) { func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) {
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3) set := newStatefulSet(3)
attempts := 0 conflict := false
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction) update := action.(core.UpdateAction)
if attempts < maxUpdateRetries/2 { if !conflict {
attempts++ conflict = true
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists")) return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
} else { } else {
return true, update.GetObject(), nil return true, update.GetObject(), nil
} }
}) })
fakeClient.AddReactor("get", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, set, nil
})
if err := control.UpdateStatefulSetReplicas(set, 2); err != nil { if err := control.UpdateStatefulSetReplicas(set, 2); err != nil {
t.Errorf("UpdateStatefulSetStatus returned an error: %s", err) t.Errorf("UpdateStatefulSetStatus returned an error: %s", err)
} }
@ -558,46 +527,18 @@ func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) {
func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) { func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) {
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3) set := newStatefulSet(3)
replicas := set.Status.Replicas
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder) indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) { fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction) update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists")) return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
}) })
fakeClient.AddReactor("get", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
if err := control.UpdateStatefulSetReplicas(set, 2); err == nil { if err := control.UpdateStatefulSetReplicas(set, 2); err == nil {
t.Error("UpdateStatefulSetStatus failed to return an error on get failure") t.Error("UpdateStatefulSetStatus failed to return an error on get failure")
} }
if set.Status.Replicas != replicas {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount)
}
}
func TestStatefulPodControlUpdateReplicasConflictMaxRetries(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
replicas := set.Status.Replicas
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, newStatefulSet(3), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))
})
fakeClient.AddReactor("get", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, newStatefulSet(3), nil
})
if err := control.UpdateStatefulSetReplicas(set, 2); err == nil {
t.Error("UpdateStatefulSetStatus failure did not return an error ")
}
if set.Status.Replicas != replicas {
t.Errorf("UpdateStatefulSetStatus mutated the sets replicas %d", set.Status.Replicas)
}
events := collectEvents(recorder.Events) events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 0 { if eventCount := len(events); eventCount != 0 {
t.Errorf("Expected 0 events for successful status update %d", eventCount) t.Errorf("Expected 0 events for successful status update %d", eventCount)

View File

@ -22,23 +22,23 @@ import (
"sort" "sort"
"time" "time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1" clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters" appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
"k8s.io/apimachinery/pkg/runtime" appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog" "github.com/golang/glog"
@ -56,20 +56,24 @@ type StatefulSetController struct {
// control returns an interface capable of syncing a stateful set. // control returns an interface capable of syncing a stateful set.
// Abstracted out for testing. // Abstracted out for testing.
control StatefulSetControlInterface control StatefulSetControlInterface
// podStore is a cache of watched pods. // podLister is able to list/get pods from a shared informer's store
podStore listers.StoreToPodLister podLister corelisters.PodLister
// podStoreSynced returns true if the pod store has synced at least once. // podListerSynced returns true if the pod shared informer has synced at least once
podStoreSynced cache.InformerSynced podListerSynced cache.InformerSynced
// A store of StatefulSets, populated by setController. // setLister is able to list/get stateful sets from a shared informer's store
setStore listers.StoreToStatefulSetLister setLister appslisters.StatefulSetLister
// Watches changes to all StatefulSets. // setListerSynced returns true if the stateful set shared informer has synced at least once
setController cache.Controller setListerSynced cache.InformerSynced
// StatefulSets that need to be synced. // StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
} }
// NewStatefulSetController creates a new statefulset controller. // NewStatefulSetController creates a new statefulset controller.
func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod time.Duration) *StatefulSetController { func NewStatefulSetController(
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
@ -77,11 +81,11 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
ssc := &StatefulSetController{ ssc := &StatefulSetController{
kubeClient: kubeClient, kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(NewRealStatefulPodControl(kubeClient, recorder)), control: NewDefaultStatefulSetControl(NewRealStatefulPodControl(kubeClient, setInformer.Lister(), podInformer.Lister(), recorder)),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
} }
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// lookup the statefulset and enqueue // lookup the statefulset and enqueue
AddFunc: ssc.addPod, AddFunc: ssc.addPod,
// lookup current and old statefulset if labels changed // lookup current and old statefulset if labels changed
@ -89,19 +93,10 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
// lookup statefulset accounting for deletion tombstones // lookup statefulset accounting for deletion tombstones
DeleteFunc: ssc.deletePod, DeleteFunc: ssc.deletePod,
}) })
ssc.podStore.Indexer = podInformer.GetIndexer() ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced
ssc.setStore.Store, ssc.setController = cache.NewInformer( setInformer.Informer().AddEventHandlerWithResyncPeriod(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ssc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ssc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).Watch(options)
},
},
&apps.StatefulSet{},
statefulSetResyncPeriod,
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: ssc.enqueueStatefulSet, AddFunc: ssc.enqueueStatefulSet,
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
@ -114,9 +109,12 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
}, },
DeleteFunc: ssc.enqueueStatefulSet, DeleteFunc: ssc.enqueueStatefulSet,
}, },
statefulSetResyncPeriod,
) )
ssc.setLister = setInformer.Lister()
ssc.setListerSynced = setInformer.Informer().HasSynced
// TODO: Watch volumes // TODO: Watch volumes
ssc.podStoreSynced = podInformer.GetController().HasSynced
return ssc return ssc
} }
@ -124,18 +122,20 @@ func NewStatefulSetController(podInformer cache.SharedIndexInformer, kubeClient
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer ssc.queue.ShutDown() defer ssc.queue.ShutDown()
glog.Infof("Starting statefulset controller") glog.Infof("Starting statefulset controller")
if !cache.WaitForCacheSync(stopCh, ssc.podStoreSynced) {
if !cache.WaitForCacheSync(stopCh, ssc.podListerSynced, ssc.setListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return return
} }
go ssc.setController.Run(stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(ssc.worker, time.Second, stopCh) go wait.Until(ssc.worker, time.Second, stopCh)
} }
<-stopCh <-stopCh
glog.Infof("Shutting down statefulset controller") glog.Infof("Shutting down statefulset controller")
} }
// addPod adds the statefulset for the pod to the sync queue // addPod adds the statefulset for the pod to the sync queue
@ -204,12 +204,12 @@ func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) (
if err != nil { if err != nil {
return []*v1.Pod{}, err return []*v1.Pod{}, err
} }
return ssc.podStore.Pods(set.Namespace).List(sel) return ssc.podLister.Pods(set.Namespace).List(sel)
} }
// getStatefulSetForPod returns the StatefulSet managing the given pod. // getStatefulSetForPod returns the StatefulSet managing the given pod.
func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet { func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.StatefulSet {
sets, err := ssc.setStore.GetPodStatefulSets(pod) sets, err := ssc.setLister.GetPodStatefulSets(pod)
if err != nil { if err != nil {
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
return nil return nil
@ -225,14 +225,14 @@ func (ssc *StatefulSetController) getStatefulSetForPod(pod *v1.Pod) *apps.Statef
sort.Sort(overlappingStatefulSets(sets)) sort.Sort(overlappingStatefulSets(sets))
// return the first created set for which pod is a member // return the first created set for which pod is a member
for i := range sets { for i := range sets {
if isMemberOf(&sets[i], pod) { if isMemberOf(sets[i], pod) {
return &sets[i] return sets[i]
} }
} }
glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name) glog.V(4).Infof("No StatefulSets found for pod %v, StatefulSet controller will avoid syncing", pod.Name)
return nil return nil
} }
return &sets[0] return sets[0]
} }
@ -276,8 +276,12 @@ func (ssc *StatefulSetController) sync(key string) error {
glog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Now().Sub(startTime)) glog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Now().Sub(startTime))
}() }()
obj, exists, err := ssc.setStore.Store.GetByKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if !exists { if err != nil {
return err
}
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.Infof("StatefulSet has been deleted %v", key) glog.Infof("StatefulSet has been deleted %v", key)
return nil return nil
} }
@ -286,13 +290,12 @@ func (ssc *StatefulSetController) sync(key string) error {
return err return err
} }
set := *obj.(*apps.StatefulSet) pods, err := ssc.getPodsForStatefulSet(set)
pods, err := ssc.getPodsForStatefulSet(&set)
if err != nil { if err != nil {
return err return err
} }
return ssc.syncStatefulSet(&set, pods) return ssc.syncStatefulSet(set, pods)
} }
// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod). // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).

View File

@ -17,8 +17,10 @@ limitations under the License.
package statefulset package statefulset
import ( import (
"fmt"
"sort" "sort"
"k8s.io/client-go/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
@ -97,8 +99,16 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
sort.Sort(ascendingOrdinal(condemned)) sort.Sort(ascendingOrdinal(condemned))
// if the current number of replicas has changed update the statefulSets replicas // if the current number of replicas has changed update the statefulSets replicas
if err := ssc.podControl.UpdateStatefulSetReplicas(set, int32(ready)); err != nil { if set.Status.Replicas != int32(ready) {
return err obj, err := api.Scheme.Copy(set)
if err != nil {
return fmt.Errorf("unable to copy set: %v", err)
}
set = obj.(*apps.StatefulSet)
if err := ssc.podControl.UpdateStatefulSetReplicas(set, int32(ready)); err != nil {
return err
}
} }
// Examine each replica with respect to its ordinal // Examine each replica with respect to its ordinal
@ -123,8 +133,17 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
set.Name, replicas[i].Name) set.Name, replicas[i].Name)
return nil return nil
} }
// Enforce the StatefulSet invariants, // Enforce the StatefulSet invariants
if err := ssc.podControl.UpdateStatefulPod(set, replicas[i]); err != nil { if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
continue
}
// Make a deep copy so we don't mutate the shared cache
copy, err := api.Scheme.DeepCopy(replicas[i])
if err != nil {
return err
}
replica := copy.(*v1.Pod)
if err := ssc.podControl.UpdateStatefulPod(set, replica); err != nil {
return err return err
} }
} }

View File

@ -26,31 +26,67 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
listers "k8s.io/kubernetes/pkg/client/legacylisters" listers "k8s.io/kubernetes/pkg/client/legacylisters"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
func TestDefaultStatefulSetControlCreatesPods(t *testing.T) { func TestDefaultStatefulSetControlCreatesPods(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != 3 { if set.Status.Replicas != 3 {
t.Error("Falied to scale statefulset to 3 replicas") t.Error("Failed to scale statefulset to 3 replicas")
} }
} }
func TestStatefulSetControlScaleUp(t *testing.T) { func TestStatefulSetControlScaleUp(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
@ -58,15 +94,33 @@ func TestStatefulSetControlScaleUp(t *testing.T) {
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to scale StatefulSet : %s", err) t.Errorf("Failed to scale StatefulSet : %s", err)
} }
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != 4 { if set.Status.Replicas != 4 {
t.Error("Falied to scale statefulset to 4 replicas") t.Error("Failed to scale statefulset to 4 replicas")
} }
} }
func TestStatefulSetControlScaleDown(t *testing.T) { func TestStatefulSetControlScaleDown(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
@ -75,19 +129,37 @@ func TestStatefulSetControlScaleDown(t *testing.T) {
t.Errorf("Failed to scale StatefulSet : %s", err) t.Errorf("Failed to scale StatefulSet : %s", err)
} }
if set.Status.Replicas != 0 { if set.Status.Replicas != 0 {
t.Error("Falied to scale statefulset to 4 replicas") t.Error("Failed to scale statefulset to 0 replicas")
} }
} }
func TestStatefulSetControlReplacesPods(t *testing.T) { func TestStatefulSetControlReplacesPods(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(5) set := newStatefulSet(5)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != 5 { if set.Status.Replicas != 5 {
t.Error("Falied to scale statefulset to 5 replicas") t.Error("Failed to scale statefulset to 5 replicas")
} }
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
@ -109,12 +181,20 @@ func TestStatefulSetControlReplacesPods(t *testing.T) {
if err = ssc.UpdateStatefulSet(set, pods); err != nil { if err = ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err) t.Errorf("Failed to update StatefulSet : %s", err)
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if pods, err = spc.setPodRunning(set, i); err != nil { if pods, err = spc.setPodRunning(set, i); err != nil {
t.Error(err) t.Error(err)
} }
if err = ssc.UpdateStatefulSet(set, pods); err != nil { if err = ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err) t.Errorf("Failed to update StatefulSet : %s", err)
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if pods, err = spc.setPodReady(set, i); err != nil { if pods, err = spc.setPodReady(set, i); err != nil {
t.Error(err) t.Error(err)
} }
@ -126,13 +206,19 @@ func TestStatefulSetControlReplacesPods(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err) t.Errorf("Failed to update StatefulSet : %s", err)
} }
if set.Status.Replicas != 5 { set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
t.Error("Falied to scale StatefulSet to 5 replicas") if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if e, a := int32(5), set.Status.Replicas; e != a {
t.Errorf("Expected to scale to %d, got %d", e, a)
} }
} }
func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
spc := newFakeStatefulPodControl() client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc) ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
@ -171,9 +257,22 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
} }
func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -213,63 +312,149 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if int(set.Status.Replicas) != 3 { if int(set.Status.Replicas) != 3 {
t.Errorf("StatefulSetControl does not unblock on %s=true", apps.StatefulSetInitAnnotation) t.Errorf("StatefulSetControl does not unblock on %s=true", apps.StatefulSetInitAnnotation)
} }
} }
func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) { func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError foudn %s", err) t.Errorf("StatefulSetControl did not return InternalError found %s", err)
} }
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != 3 { if set.Status.Replicas != 3 {
t.Error("Falied to scale StatefulSet to 3 replicas") t.Error("Failed to scale StatefulSet to 3 replicas")
} }
} }
func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) { func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) client := fake.NewSimpleClientset(set)
if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError foudn %s", err) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
} spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
// have to have 1 successful loop first
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Fatalf("Unexpected error: %v", err)
}
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
} }
if set.Status.Replicas != 3 { if set.Status.Replicas != 3 {
t.Error("Falied to scale StatefulSet to 3 replicas") t.Error("Failed to scale StatefulSet to 3 replicas")
}
// now mutate a pod's identity
pods, err := spc.podsLister.List(labels.Everything())
if err != nil {
t.Fatalf("Error listing pods: %v", err)
}
if len(pods) != 3 {
t.Fatalf("Expected 3 pods, got %d", len(pods))
}
sort.Sort(ascendingOrdinal(pods))
pods[0].Name = "goo-0"
spc.podsIndexer.Update(pods[0])
// now it should fail
if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err)
} }
} }
func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) { func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) spc.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError foudn %s", err) t.Errorf("StatefulSetControl did not return InternalError found %s", err)
} }
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != 3 { if set.Status.Replicas != 3 {
t.Error("Falied to scale StatefulSet to 3 replicas") t.Error("Failed to scale StatefulSet to 3 replicas")
} }
} }
func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) { func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -313,22 +498,44 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
} }
func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
spc := newFakeStatefulPodControl()
ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
*set.Spec.Replicas = 0 *set.Spec.Replicas = 0
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleDownStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { if err := scaleDownStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl failed to throw error on delte %s", err) t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
} }
if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil { if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil {
t.Errorf("Failed to turn down StatefulSet %s", err) t.Errorf("Failed to turn down StatefulSet %s", err)
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != 0 { if set.Status.Replicas != 0 {
t.Error("Falied to scale statefulset to 4 replicas") t.Error("Failed to scale statefulset to 0 replicas")
} }
} }
@ -352,9 +559,9 @@ func (rt *requestTracker) reset() {
} }
type fakeStatefulPodControl struct { type fakeStatefulPodControl struct {
podsLister listers.StoreToPodLister podsLister corelisters.PodLister
claimsLister listers.StoreToPersistentVolumeClaimLister claimsLister listers.StoreToPersistentVolumeClaimLister
setsLister listers.StoreToStatefulSetLister setsLister appslisters.StatefulSetLister
podsIndexer cache.Indexer podsIndexer cache.Indexer
claimsIndexer cache.Indexer claimsIndexer cache.Indexer
setsIndexer cache.Indexer setsIndexer cache.Indexer
@ -364,20 +571,16 @@ type fakeStatefulPodControl struct {
updateStatusTracker requestTracker updateStatusTracker requestTracker
} }
func newFakeStatefulPodControl() *fakeStatefulPodControl { func newFakeStatefulPodControl(podInformer coreinformers.PodInformer, setInformer appsinformers.StatefulSetInformer) *fakeStatefulPodControl {
podsIndexer := cache.NewIndexer(controller.KeyFunc,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimsIndexer := cache.NewIndexer(controller.KeyFunc, claimsIndexer := cache.NewIndexer(controller.KeyFunc,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
setsIndexer := cache.NewIndexer(controller.KeyFunc,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
return &fakeStatefulPodControl{ return &fakeStatefulPodControl{
listers.StoreToPodLister{Indexer: podsIndexer}, podInformer.Lister(),
listers.StoreToPersistentVolumeClaimLister{Indexer: claimsIndexer}, listers.StoreToPersistentVolumeClaimLister{Indexer: claimsIndexer},
listers.StoreToStatefulSetLister{Store: setsIndexer}, setInformer.Lister(),
podsIndexer, podInformer.Informer().GetIndexer(),
claimsIndexer, claimsIndexer,
setsIndexer, setInformer.Informer().GetIndexer(),
requestTracker{0, nil, 0}, requestTracker{0, nil, 0},
requestTracker{0, nil, 0}, requestTracker{0, nil, 0},
requestTracker{0, nil, 0}, requestTracker{0, nil, 0},
@ -654,12 +857,20 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInte
if err = ssc.UpdateStatefulSet(set, pods); err != nil { if err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err return err
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if pods, err = spc.setPodRunning(set, ord); err != nil { if pods, err = spc.setPodRunning(set, ord); err != nil {
return err return err
} }
if err = ssc.UpdateStatefulSet(set, pods); err != nil { if err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err return err
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if pods, err = spc.setPodReady(set, ord); err != nil { if pods, err = spc.setPodReady(set, ord); err != nil {
return err return err
} }
@ -667,6 +878,10 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInte
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
return err return err
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if err := assertInvariants(set, spc); err != nil { if err := assertInvariants(set, spc); err != nil {
return err return err
} }
@ -688,12 +903,20 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
return err return err
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if pods, err = spc.addTerminatedPod(set, ordinal); err != nil { if pods, err = spc.addTerminatedPod(set, ordinal); err != nil {
return err return err
} }
if err = ssc.UpdateStatefulSet(set, pods); err != nil { if err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err return err
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector) pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil { if err != nil {
return err return err
@ -704,6 +927,10 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
return err return err
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if err := assertInvariants(set, spc); err != nil { if err := assertInvariants(set, spc); err != nil {
return err return err
} }

View File

@ -24,13 +24,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
func alwaysReady() bool { return true }
func TestStatefulSetControllerCreates(t *testing.T) { func TestStatefulSetControllerCreates(t *testing.T) {
ssc, spc := newFakeStatefulSetController() ssc, spc := newFakeStatefulSetController()
set := newStatefulSet(3) set := newStatefulSet(3)
@ -343,15 +346,18 @@ func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) {
} }
func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodControl) { func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodControl) {
fpc := newFakeStatefulPodControl() client := fake.NewSimpleClientset()
ssc := &StatefulSetController{ informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
kubeClient: nil, fpc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
podStoreSynced: func() bool { return true }, ssc := NewStatefulSetController(
setStore: fpc.setsLister, informerFactory.Core().V1().Pods(),
podStore: fpc.podsLister, informerFactory.Apps().V1beta1().StatefulSets(),
control: NewDefaultStatefulSetControl(fpc), client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), )
} ssc.podListerSynced = alwaysReady
ssc.setListerSynced = alwaysReady
ssc.control = NewDefaultStatefulSetControl(fpc)
return ssc, fpc return ssc, fpc
} }

View File

@ -38,7 +38,7 @@ var updateConflictError = fmt.Errorf("aborting update after %d attempts", maxUpd
// overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker. // overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker.
// Generally used to tie break between StatefulSets that have overlapping selectors. // Generally used to tie break between StatefulSets that have overlapping selectors.
type overlappingStatefulSets []apps.StatefulSet type overlappingStatefulSets []*apps.StatefulSet
func (o overlappingStatefulSets) Len() int { return len(o) } func (o overlappingStatefulSets) Len() int { return len(o) }

View File

@ -251,10 +251,10 @@ func TestAscendingOrdinal(t *testing.T) {
} }
func TestOverlappingStatefulSets(t *testing.T) { func TestOverlappingStatefulSets(t *testing.T) {
sets := make([]apps.StatefulSet, 10) sets := make([]*apps.StatefulSet, 10)
perm := rand.Perm(10) perm := rand.Perm(10)
for i, v := range perm { for i, v := range perm {
sets[i] = *newStatefulSet(10) sets[i] = newStatefulSet(10)
sets[i].CreationTimestamp = metav1.NewTime(sets[i].CreationTimestamp.Add(time.Duration(v) * time.Second)) sets[i].CreationTimestamp = metav1.NewTime(sets[i].CreationTimestamp.Add(time.Duration(v) * time.Second))
} }
sort.Sort(overlappingStatefulSets(sets)) sort.Sort(overlappingStatefulSets(sets))
@ -262,7 +262,7 @@ func TestOverlappingStatefulSets(t *testing.T) {
t.Error("ascendingOrdinal fails to sort Pods") t.Error("ascendingOrdinal fails to sort Pods")
} }
for i, v := range perm { for i, v := range perm {
sets[i] = *newStatefulSet(10) sets[i] = newStatefulSet(10)
sets[i].Name = strconv.FormatInt(int64(v), 10) sets[i].Name = strconv.FormatInt(int64(v), 10)
} }
sort.Sort(overlappingStatefulSets(sets)) sort.Sort(overlappingStatefulSets(sets))

View File

@ -264,20 +264,19 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
framework.KubeDescribe("CassandraStatefulSet", func() { framework.KubeDescribe("CassandraStatefulSet", func() {
It("should create statefulset", func() { It("should create statefulset", func() {
mkpath := func(file string) string { mkpath := func(file string) string {
return filepath.Join(framework.TestContext.RepoRoot, "examples/storage/cassandra", file) return filepath.Join("examples/storage/cassandra", file)
} }
serviceYaml := mkpath("cassandra-service.yaml") serviceYaml := mkpath("cassandra-service.yaml")
nsFlag := fmt.Sprintf("--namespace=%v", ns) nsFlag := fmt.Sprintf("--namespace=%v", ns)
// have to change dns prefix because of the dynamic namespace // have to change dns prefix because of the dynamic namespace
input, err := ioutil.ReadFile(mkpath("cassandra-statefulset.yaml")) input := generated.ReadOrDie(mkpath("cassandra-statefulset.yaml"))
Expect(err).NotTo(HaveOccurred())
output := strings.Replace(string(input), "cassandra-0.cassandra.default.svc.cluster.local", "cassandra-0.cassandra."+ns+".svc.cluster.local", -1) output := strings.Replace(string(input), "cassandra-0.cassandra.default.svc.cluster.local", "cassandra-0.cassandra."+ns+".svc.cluster.local", -1)
statefulsetYaml := "/tmp/cassandra-statefulset.yaml" statefulsetYaml := "/tmp/cassandra-statefulset.yaml"
err = ioutil.WriteFile(statefulsetYaml, []byte(output), 0644) err := ioutil.WriteFile(statefulsetYaml, []byte(output), 0644)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Starting the cassandra service") By("Starting the cassandra service")

View File

@ -73,6 +73,7 @@ go_library(
"//pkg/volume/util/volumehelper:go_default_library", "//pkg/volume/util/volumehelper:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library", "//plugin/pkg/scheduler/algorithm/predicates:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library",
"//test/e2e/generated:go_default_library",
"//test/e2e/perftype:go_default_library", "//test/e2e/perftype:go_default_library",
"//test/utils:go_default_library", "//test/utils:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",

View File

@ -18,7 +18,6 @@ package framework
import ( import (
"fmt" "fmt"
"io/ioutil"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
@ -38,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/test/e2e/generated"
) )
const ( const (
@ -70,8 +70,7 @@ func CreateStatefulSetService(name string, labels map[string]string) *v1.Service
func StatefulSetFromManifest(fileName, ns string) *apps.StatefulSet { func StatefulSetFromManifest(fileName, ns string) *apps.StatefulSet {
var ss apps.StatefulSet var ss apps.StatefulSet
Logf("Parsing statefulset from %v", fileName) Logf("Parsing statefulset from %v", fileName)
data, err := ioutil.ReadFile(fileName) data := generated.ReadOrDie(fileName)
Expect(err).NotTo(HaveOccurred())
json, err := utilyaml.ToJSON(data) json, err := utilyaml.ToJSON(data)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -99,7 +98,7 @@ func NewStatefulSetTester(c clientset.Interface) *StatefulSetTester {
// CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create. // CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create.
func (s *StatefulSetTester) CreateStatefulSet(manifestPath, ns string) *apps.StatefulSet { func (s *StatefulSetTester) CreateStatefulSet(manifestPath, ns string) *apps.StatefulSet {
mkpath := func(file string) string { mkpath := func(file string) string {
return filepath.Join(TestContext.RepoRoot, manifestPath, file) return filepath.Join(manifestPath, file)
} }
ss := StatefulSetFromManifest(mkpath("statefulset.yaml"), ns) ss := StatefulSetFromManifest(mkpath("statefulset.yaml"), ns)