controller change for statefulset auto-delete (tests)

Change-Id: I16b50e6853bba65fc89c793d2b9b335581c02407
This commit is contained in:
Matthew Cary 2021-06-30 18:41:55 -07:00
parent bce87a3e4f
commit 53b3a6c1d9
5 changed files with 2043 additions and 520 deletions

View File

@ -19,23 +19,27 @@ package statefulset
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
featuregatetesting "k8s.io/component-base/featuregate/testing"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
)
func TestStatefulPodControlCreatesPods(t *testing.T) {
@ -43,14 +47,15 @@ func TestStatefulPodControlCreatesPods(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
})
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
create := action.(core.CreateAction)
claimIndexer.Add(create.GetObject())
return true, create.GetObject(), nil
})
fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
@ -83,7 +88,7 @@ func TestStatefulPodControlCreatePodExists(t *testing.T) {
pvcIndexer.Add(&pvc)
}
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
create := action.(core.CreateAction)
return true, create.GetObject(), nil
@ -110,7 +115,7 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
fakeClient := &fake.Clientset{}
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
@ -131,7 +136,7 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
}
}
}
func TestStatefulPodControlCreatePodPvcDeleting(t *testing.T) {
func TestStatefulPodControlCreatePodPVCDeleting(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
@ -145,7 +150,7 @@ func TestStatefulPodControlCreatePodPvcDeleting(t *testing.T) {
pvcIndexer.Add(&pvc)
}
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
create := action.(core.CreateAction)
return true, create.GetObject(), nil
@ -184,7 +189,7 @@ func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) {
fakeClient := &fake.Clientset{}
pvcIndexer := &fakeIndexer{getError: errors.New("API server down")}
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
@ -213,7 +218,7 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) {
fakeClient := &fake.Clientset{}
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
create := action.(core.CreateAction)
return true, create.GetObject(), nil
@ -232,7 +237,6 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) {
} else if !strings.Contains(events[1], v1.EventTypeWarning) {
t.Errorf("Found unexpected non-warning event %s", events[1])
}
}
@ -241,7 +245,14 @@ func TestStatefulPodControlNoOpUpdate(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
claims := getPersistentVolumeClaims(set, pod)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
for k := range claims {
claim := claims[k]
indexer.Add(&claim)
}
claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
t.Error("no-op update should not make any client invocation")
return true, nil, apierrors.NewInternalError(errors.New("if we are here we have a problem"))
@ -260,7 +271,9 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := fake.NewSimpleClientset(set, pod)
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
var updated *v1.Pod
fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
@ -287,12 +300,14 @@ func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
gooPod := newStatefulSetPod(set, 0)
gooPod.Name = "goo-0"
indexer.Add(gooPod)
podLister := corelisters.NewPodLister(indexer)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
podIndexer.Add(gooPod)
podLister := corelisters.NewPodLister(podIndexer)
claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
pod.Name = "goo-0"
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
@ -319,7 +334,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
fakeClient := &fake.Clientset{}
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
pvcs := getPersistentVolumeClaims(set, pod)
volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
for i := range pod.Spec.Volumes {
@ -366,7 +381,7 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
fakeClient := &fake.Clientset{}
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
pvcs := getPersistentVolumeClaims(set, pod)
volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
for i := range pod.Spec.Volumes {
@ -401,12 +416,19 @@ func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podLister := corelisters.NewPodLister(podIndexer)
claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(podIndexer)
gooPod := newStatefulSetPod(set, 0)
gooPod.Name = "goo-0"
indexer.Add(gooPod)
podLister := corelisters.NewPodLister(indexer)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
gooPod.Labels[apps.StatefulSetPodNameLabel] = "goo-starts"
podIndexer.Add(gooPod)
claims := getPersistentVolumeClaims(set, gooPod)
for k := range claims {
claim := claims[k]
claimIndexer.Add(&claim)
}
control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
conflict := false
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
@ -417,7 +439,7 @@ func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
return true, update.GetObject(), nil
})
pod.Name = "goo-0"
pod.Labels[apps.StatefulSetPodNameLabel] = "goo-0"
if err := control.UpdateStatefulPod(set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err)
}
@ -437,7 +459,7 @@ func TestStatefulPodControlDeletesStatefulPod(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
control := NewStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, nil
})
@ -457,7 +479,7 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
control := NewStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
@ -472,6 +494,344 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) {
}
}
func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) {
// The claimOwnerMatchesSetAndPod is tested exhaustively in stateful_set_utils_test; this
// test is for the wiring to the method tested there.
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
claims := getPersistentVolumeClaims(set, pod)
for k := range claims {
claim := claims[k]
indexer.Add(&claim)
}
control := NewStatefulPodControl(fakeClient, nil, claimLister, &noopRecorder{})
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
if matches, err := control.ClaimsMatchRetentionPolicy(set, pod); err != nil {
t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (retain): %v", err)
} else if !matches {
t.Error("Unexpected non-match for ClaimsMatchRetentionPolicy (retain)")
}
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
if matches, err := control.ClaimsMatchRetentionPolicy(set, pod); err != nil {
t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (set deletion): %v", err)
} else if matches {
t.Error("Unexpected match for ClaimsMatchRetentionPolicy (set deletion)")
}
}
func TestStatefulPodControlUpdatePodClaimForRetentionPolicy(t *testing.T) {
// All the update conditions are tested exhaustively in stateful_set_utils_test. This
// tests the wiring from the pod control to that method.
testFn := func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
set := newStatefulSet(3)
set.GetObjectMeta().SetUID("set-123")
pod := newStatefulSetPod(set, 0)
claims := getPersistentVolumeClaims(set, pod)
for k := range claims {
claim := claims[k]
indexer.Add(&claim)
}
control := NewStatefulPodControl(fakeClient, nil, claimLister, &noopRecorder{})
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
if err := control.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
t.Errorf("Unexpected error for UpdatePodClaimForRetentionPolicy (retain): %v", err)
}
expectRef := utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC)
for k := range claims {
claim, err := claimLister.PersistentVolumeClaims(claims[k].Namespace).Get(claims[k].Name)
if err != nil {
t.Errorf("Unexpected error getting Claim %s/%s: %v", claim.Namespace, claim.Name, err)
}
if hasOwnerRef(claim, set) != expectRef {
t.Errorf("Claim %s/%s bad set owner ref", claim.Namespace, claim.Name)
}
}
}
t.Run("StatefulSetAutoDeletePVCEnabled", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
testFn(t)
})
t.Run("StatefulSetAutoDeletePVCDisabled", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, false)()
testFn(t)
})
}
func TestPodClaimIsStale(t *testing.T) {
const missing = "missing"
const exists = "exists"
const stale = "stale"
const withRef = "with-ref"
testCases := []struct {
name string
claimStates []string
expected bool
skipPodUID bool
}{
{
name: "all missing",
claimStates: []string{missing, missing},
expected: false,
},
{
name: "no claims",
claimStates: []string{},
expected: false,
},
{
name: "exists",
claimStates: []string{missing, exists},
expected: false,
},
{
name: "all refs",
claimStates: []string{withRef, withRef},
expected: false,
},
{
name: "stale & exists",
claimStates: []string{stale, exists},
expected: true,
},
{
name: "stale & missing",
claimStates: []string{stale, missing},
expected: true,
},
{
name: "withRef & stale",
claimStates: []string{withRef, stale},
expected: true,
},
{
name: "withRef, no UID",
claimStates: []string{withRef},
skipPodUID: true,
expected: true,
},
}
for _, tc := range testCases {
set := apps.StatefulSet{}
set.Name = "set"
set.Namespace = "default"
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
}
set.Spec.Selector = &metav1.LabelSelector{MatchLabels: map[string]string{"key": "value"}}
claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
for i, claimState := range tc.claimStates {
claim := v1.PersistentVolumeClaim{}
claim.Name = fmt.Sprintf("claim-%d", i)
set.Spec.VolumeClaimTemplates = append(set.Spec.VolumeClaimTemplates, claim)
claim.Name = fmt.Sprintf("%s-set-3", claim.Name)
claim.Namespace = set.Namespace
switch claimState {
case missing:
// Do nothing, the claim shouldn't exist.
case exists:
claimIndexer.Add(&claim)
case stale:
claim.SetOwnerReferences([]metav1.OwnerReference{
{Name: "set-3", UID: types.UID("stale")},
})
claimIndexer.Add(&claim)
case withRef:
claim.SetOwnerReferences([]metav1.OwnerReference{
{Name: "set-3", UID: types.UID("123")},
})
claimIndexer.Add(&claim)
}
}
pod := v1.Pod{}
pod.Name = "set-3"
if !tc.skipPodUID {
pod.SetUID("123")
}
claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
control := NewStatefulPodControl(&fake.Clientset{}, nil, claimLister, &noopRecorder{})
expected := tc.expected
// Note that the error isn't / can't be tested.
if stale, _ := control.PodClaimIsStale(&set, &pod); stale != expected {
t.Errorf("unexpected stale for %s", tc.name)
}
}
}
func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) {
testFn := func(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(1)
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podLister := corelisters.NewPodLister(podIndexer)
claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
podIndexer.Add(pod)
claims := getPersistentVolumeClaims(set, pod)
if len(claims) < 1 {
t.Errorf("Unexpected missing PVCs")
}
for k := range claims {
claim := claims[k]
setOwnerRef(&claim, set, &set.TypeMeta) // This ownerRef should be removed in the update.
claimIndexer.Add(&claim)
}
control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
if err := control.UpdateStatefulPod(set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err)
}
for k := range claims {
claim := claims[k]
if hasOwnerRef(&claim, set) {
t.Errorf("ownerRef not removed: %s/%s", claim.Namespace, claim.Name)
}
}
events := collectEvents(recorder.Events)
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
if eventCount := len(events); eventCount != 1 {
t.Errorf("delete failed: got %d events, but want 1", eventCount)
}
} else {
if len(events) != 0 {
t.Errorf("delete failed: expected no events, but got %v", events)
}
}
}
t.Run("StatefulSetAutoDeletePVCEnabled", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
testFn(t)
})
t.Run("StatefulSetAutoDeletePVCDisabled", func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, false)()
testFn(t)
})
}
func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) {
// Only applicable when the feature gate is on; the off case is tested in TestStatefulPodControlRetainRetentionPolicyUpdate.
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(1)
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podIndexer.Add(pod)
claims := getPersistentVolumeClaims(set, pod)
if len(claims) != 1 {
t.Errorf("Unexpected or missing PVCs")
}
var claim v1.PersistentVolumeClaim
for k := range claims {
claim = claims[k]
claimIndexer.Add(&claim)
}
fakeClient.AddReactor("update", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
claimIndexer.Update(update.GetObject())
return true, update.GetObject(), nil
})
podLister := corelisters.NewPodLister(podIndexer)
claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
if err := control.UpdateStatefulPod(set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err)
}
updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
if err != nil {
t.Errorf("Error retrieving claim %s/%s: %v", claim.Namespace, claim.Name, err)
}
if !hasOwnerRef(updatedClaim, set) {
t.Errorf("ownerRef not added: %s/%s", claim.Namespace, claim.Name)
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 1 {
t.Errorf("update failed: got %d events, but want 1", eventCount)
}
}
func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) {
// Only applicable when the feature gate is on.
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(1)
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podLister := corelisters.NewPodLister(podIndexer)
claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
podIndexer.Add(pod)
fakeClient.AddReactor("update", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
claimIndexer.Update(update.GetObject())
return true, update.GetObject(), nil
})
control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
if err := control.UpdateStatefulPod(set, pod); err != nil {
t.Error("Unexpected error on pod update when PVCs are missing")
}
claims := getPersistentVolumeClaims(set, pod)
if len(claims) != 1 {
t.Errorf("Unexpected or missing PVCs")
}
var claim v1.PersistentVolumeClaim
for k := range claims {
claim = claims[k]
claimIndexer.Add(&claim)
}
if err := control.UpdateStatefulPod(set, pod); err != nil {
t.Errorf("Expected update to succeed, saw error %v", err)
}
updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
if err != nil {
t.Errorf("Error retrieving claim %s/%s: %v", claim.Namespace, claim.Name, err)
}
if !hasOwnerRef(updatedClaim, set) {
t.Errorf("ownerRef not added: %s/%s", claim.Namespace, claim.Name)
}
events := collectEvents(recorder.Events)
if eventCount := len(events); eventCount != 1 {
t.Errorf("update failed: got %d events, but want 2", eventCount)
}
if !strings.Contains(events[0], "SuccessfulUpdate") {
t.Errorf("expected first event to be a successful update: %s", events[1])
}
}
func collectEvents(source <-chan string) []string {
done := false
events := make([]string, 0)

File diff suppressed because it is too large Load Diff

View File

@ -20,21 +20,28 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"sort"
"testing"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
)
var parentKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
@ -43,11 +50,11 @@ func alwaysReady() bool { return true }
func TestStatefulSetControllerCreates(t *testing.T) {
set := newStatefulSet(3)
ssc, spc, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc); err != nil {
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
if obj, _, err := spc.setsIndexer.Get(set); err != nil {
if obj, _, err := om.setsIndexer.Get(set); err != nil {
t.Error(err)
} else {
set = obj.(*apps.StatefulSet)
@ -59,11 +66,11 @@ func TestStatefulSetControllerCreates(t *testing.T) {
func TestStatefulSetControllerDeletes(t *testing.T) {
set := newStatefulSet(3)
ssc, spc, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc); err != nil {
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
if obj, _, err := spc.setsIndexer.Get(set); err != nil {
if obj, _, err := om.setsIndexer.Get(set); err != nil {
t.Error(err)
} else {
set = obj.(*apps.StatefulSet)
@ -72,10 +79,10 @@ func TestStatefulSetControllerDeletes(t *testing.T) {
t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
}
*set.Spec.Replicas = 0
if err := scaleDownStatefulSetController(set, ssc, spc); err != nil {
if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn down StatefulSet : %s", err)
}
if obj, _, err := spc.setsIndexer.Get(set); err != nil {
if obj, _, err := om.setsIndexer.Get(set); err != nil {
t.Error(err)
} else {
set = obj.(*apps.StatefulSet)
@ -87,11 +94,11 @@ func TestStatefulSetControllerDeletes(t *testing.T) {
func TestStatefulSetControllerRespectsTermination(t *testing.T) {
set := newStatefulSet(3)
ssc, spc, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc); err != nil {
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
if obj, _, err := spc.setsIndexer.Get(set); err != nil {
if obj, _, err := om.setsIndexer.Get(set); err != nil {
t.Error(err)
} else {
set = obj.(*apps.StatefulSet)
@ -99,11 +106,11 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) {
if set.Status.Replicas != 3 {
t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
}
_, err := spc.addTerminatingPod(set, 3)
_, err := om.addTerminatingPod(set, 3)
if err != nil {
t.Error(err)
}
pods, err := spc.addTerminatingPod(set, 4)
pods, err := om.addTerminatingPod(set, 4)
if err != nil {
t.Error(err)
}
@ -112,7 +119,7 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) {
if err != nil {
t.Error(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
@ -123,10 +130,10 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) {
spc.DeleteStatefulPod(set, pods[3])
spc.DeleteStatefulPod(set, pods[4])
*set.Spec.Replicas = 0
if err := scaleDownStatefulSetController(set, ssc, spc); err != nil {
if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn down StatefulSet : %s", err)
}
if obj, _, err := spc.setsIndexer.Get(set); err != nil {
if obj, _, err := om.setsIndexer.Get(set); err != nil {
t.Error(err)
} else {
set = obj.(*apps.StatefulSet)
@ -138,11 +145,11 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) {
func TestStatefulSetControllerBlocksScaling(t *testing.T) {
set := newStatefulSet(3)
ssc, spc, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc); err != nil {
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
if obj, _, err := spc.setsIndexer.Get(set); err != nil {
if obj, _, err := om.setsIndexer.Get(set); err != nil {
t.Error(err)
} else {
set = obj.(*apps.StatefulSet)
@ -152,8 +159,8 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) {
}
*set.Spec.Replicas = 5
fakeResourceVersion(set)
spc.setsIndexer.Update(set)
_, err := spc.setPodTerminated(set, 0)
om.setsIndexer.Update(set)
_, err := om.setPodTerminated(set, 0)
if err != nil {
t.Error("Failed to set pod terminated at ordinal 0")
}
@ -163,7 +170,7 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) {
if err != nil {
t.Error(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
@ -174,7 +181,7 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) {
spc.DeleteStatefulPod(set, pods[0])
ssc.enqueueStatefulSet(set)
fakeWorker(ssc)
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
@ -186,9 +193,9 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) {
func TestStatefulSetControllerDeletionTimestamp(t *testing.T) {
set := newStatefulSet(3)
set.DeletionTimestamp = new(metav1.Time)
ssc, spc, _ := newFakeStatefulSetController(set)
ssc, _, om, _ := newFakeStatefulSetController(set)
spc.setsIndexer.Add(set)
om.setsIndexer.Add(set)
// Force a sync. It should not try to create any Pods.
ssc.enqueueStatefulSet(set)
@ -198,7 +205,7 @@ func TestStatefulSetControllerDeletionTimestamp(t *testing.T) {
if err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
@ -211,17 +218,17 @@ func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) {
set := newStatefulSet(3)
// The bare client says it IS deleted.
set.DeletionTimestamp = new(metav1.Time)
ssc, spc, ssh := newFakeStatefulSetController(set)
ssc, _, om, ssh := newFakeStatefulSetController(set)
// The lister (cache) says it's NOT deleted.
set2 := *set
set2.DeletionTimestamp = nil
spc.setsIndexer.Add(&set2)
om.setsIndexer.Add(&set2)
// The recheck occurs in the presence of a matching orphan.
pod := newStatefulSetPod(set, 1)
pod.OwnerReferences = nil
spc.podsIndexer.Add(pod)
om.podsIndexer.Add(pod)
set.Status.CollisionCount = new(int32)
revision, err := newRevision(set, 1, set.Status.CollisionCount)
if err != nil {
@ -241,7 +248,7 @@ func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) {
if err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
@ -272,13 +279,13 @@ func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) {
}
func TestStatefulSetControllerAddPod(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
pod1 := newStatefulSetPod(set1, 0)
pod2 := newStatefulSetPod(set2, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
om.setsIndexer.Add(set1)
om.setsIndexer.Add(set2)
ssc.addPod(pod1)
key, done := ssc.queue.Get()
@ -304,7 +311,7 @@ func TestStatefulSetControllerAddPod(t *testing.T) {
}
func TestStatefulSetControllerAddPodOrphan(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
@ -312,9 +319,9 @@ func TestStatefulSetControllerAddPodOrphan(t *testing.T) {
set3.Name = "foo3"
set3.Spec.Selector.MatchLabels = map[string]string{"foo3": "bar"}
pod := newStatefulSetPod(set1, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
spc.setsIndexer.Add(set3)
om.setsIndexer.Add(set1)
om.setsIndexer.Add(set2)
om.setsIndexer.Add(set3)
// Make pod an orphan. Expect matching sets to be queued.
pod.OwnerReferences = nil
@ -325,7 +332,7 @@ func TestStatefulSetControllerAddPodOrphan(t *testing.T) {
}
func TestStatefulSetControllerAddPodNoSet(t *testing.T) {
ssc, _, _ := newFakeStatefulSetController()
ssc, _, _, _ := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
ssc.addPod(pod)
@ -337,14 +344,14 @@ func TestStatefulSetControllerAddPodNoSet(t *testing.T) {
}
func TestStatefulSetControllerUpdatePod(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod1 := newStatefulSetPod(set1, 0)
pod2 := newStatefulSetPod(set2, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
om.setsIndexer.Add(set1)
om.setsIndexer.Add(set2)
prev := *pod1
fakeResourceVersion(pod1)
@ -372,7 +379,7 @@ func TestStatefulSetControllerUpdatePod(t *testing.T) {
}
func TestStatefulSetControllerUpdatePodWithNoSet(t *testing.T) {
ssc, _, _ := newFakeStatefulSetController()
ssc, _, _, _ := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
prev := *pod
@ -386,10 +393,10 @@ func TestStatefulSetControllerUpdatePodWithNoSet(t *testing.T) {
}
func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
spc.setsIndexer.Add(set)
om.setsIndexer.Add(set)
ssc.updatePod(pod, pod)
ssc.queue.ShutDown()
key, _ := ssc.queue.Get()
@ -399,14 +406,14 @@ func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) {
}
func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
pod.OwnerReferences = nil
set2 := newStatefulSet(3)
set2.Name = "foo2"
spc.setsIndexer.Add(set)
spc.setsIndexer.Add(set2)
om.setsIndexer.Add(set)
om.setsIndexer.Add(set2)
clone := *pod
clone.Labels = map[string]string{"foo2": "bar2"}
fakeResourceVersion(&clone)
@ -417,14 +424,14 @@ func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) {
}
func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod := newStatefulSetPod(set, 0)
pod2 := newStatefulSetPod(set2, 0)
spc.setsIndexer.Add(set)
spc.setsIndexer.Add(set2)
om.setsIndexer.Add(set)
om.setsIndexer.Add(set2)
clone := *pod
clone.OwnerReferences = pod2.OwnerReferences
fakeResourceVersion(&clone)
@ -435,13 +442,13 @@ func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) {
}
func TestStatefulSetControllerUpdatePodRelease(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod := newStatefulSetPod(set, 0)
spc.setsIndexer.Add(set)
spc.setsIndexer.Add(set2)
om.setsIndexer.Add(set)
om.setsIndexer.Add(set2)
clone := *pod
clone.OwnerReferences = nil
fakeResourceVersion(&clone)
@ -452,14 +459,14 @@ func TestStatefulSetControllerUpdatePodRelease(t *testing.T) {
}
func TestStatefulSetControllerDeletePod(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod1 := newStatefulSetPod(set1, 0)
pod2 := newStatefulSetPod(set2, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
om.setsIndexer.Add(set1)
om.setsIndexer.Add(set2)
ssc.deletePod(pod1)
key, done := ssc.queue.Get()
@ -483,13 +490,13 @@ func TestStatefulSetControllerDeletePod(t *testing.T) {
}
func TestStatefulSetControllerDeletePodOrphan(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod1 := newStatefulSetPod(set1, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
om.setsIndexer.Add(set1)
om.setsIndexer.Add(set2)
pod1.OwnerReferences = nil
ssc.deletePod(pod1)
@ -499,10 +506,10 @@ func TestStatefulSetControllerDeletePodOrphan(t *testing.T) {
}
func TestStatefulSetControllerDeletePodTombstone(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
spc.setsIndexer.Add(set)
om.setsIndexer.Add(set)
tombstoneKey, _ := controller.KeyFunc(pod)
tombstone := cache.DeletedFinalStateUnknown{Key: tombstoneKey, Obj: pod}
ssc.deletePod(tombstone)
@ -517,14 +524,14 @@ func TestStatefulSetControllerDeletePodTombstone(t *testing.T) {
}
func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) {
ssc, spc, _ := newFakeStatefulSetController()
ssc, _, om, _ := newFakeStatefulSetController()
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
pod := newStatefulSetPod(set1, 0)
spc.setsIndexer.Add(set1)
spc.setsIndexer.Add(set2)
spc.podsIndexer.Add(pod)
om.setsIndexer.Add(set1)
om.setsIndexer.Add(set2)
om.podsIndexer.Add(pod)
sets := ssc.getStatefulSetsForPod(pod)
if got, want := len(sets), 2; got != want {
t.Errorf("len(sets) = %v, want %v", got, want)
@ -546,12 +553,12 @@ func TestGetPodsForStatefulSetAdopt(t *testing.T) {
pod4.OwnerReferences = nil
pod4.Name = "x" + pod4.Name
ssc, spc, _ := newFakeStatefulSetController(set, pod1, pod2, pod3, pod4)
ssc, _, om, _ := newFakeStatefulSetController(set, pod1, pod2, pod3, pod4)
spc.podsIndexer.Add(pod1)
spc.podsIndexer.Add(pod2)
spc.podsIndexer.Add(pod3)
spc.podsIndexer.Add(pod4)
om.podsIndexer.Add(pod1)
om.podsIndexer.Add(pod2)
om.podsIndexer.Add(pod3)
om.podsIndexer.Add(pod4)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
@ -588,10 +595,10 @@ func TestAdoptOrphanRevisions(t *testing.T) {
ss1Rev2.Namespace = ss1.Namespace
ss1Rev2.OwnerReferences = []metav1.OwnerReference{}
ssc, spc, _ := newFakeStatefulSetController(ss1, ss1Rev1, ss1Rev2)
ssc, _, om, _ := newFakeStatefulSetController(ss1, ss1Rev1, ss1Rev2)
spc.revisionsIndexer.Add(ss1Rev1)
spc.revisionsIndexer.Add(ss1Rev2)
om.revisionsIndexer.Add(ss1Rev1)
om.revisionsIndexer.Add(ss1Rev2)
err = ssc.adoptOrphanRevisions(context.TODO(), ss1)
if err != nil {
@ -615,7 +622,7 @@ func TestAdoptOrphanRevisions(t *testing.T) {
func TestGetPodsForStatefulSetRelease(t *testing.T) {
set := newStatefulSet(3)
ssc, spc, _ := newFakeStatefulSetController(set)
ssc, _, om, _ := newFakeStatefulSetController(set)
pod1 := newStatefulSetPod(set, 1)
// pod2 is owned but has wrong name.
pod2 := newStatefulSetPod(set, 2)
@ -628,9 +635,10 @@ func TestGetPodsForStatefulSetRelease(t *testing.T) {
pod4.OwnerReferences = nil
pod4.Labels = nil
spc.podsIndexer.Add(pod1)
spc.podsIndexer.Add(pod2)
spc.podsIndexer.Add(pod3)
om.podsIndexer.Add(pod1)
om.podsIndexer.Add(pod2)
om.podsIndexer.Add(pod3)
om.podsIndexer.Add(pod4)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
@ -651,10 +659,255 @@ func TestGetPodsForStatefulSetRelease(t *testing.T) {
}
}
func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSetController, *fakeStatefulPodControl, history.Interface) {
func TestOrphanedPodsWithPVCDeletePolicy(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
testFn := func(t *testing.T, scaledownPolicy, deletionPolicy apps.PersistentVolumeClaimRetentionPolicyType) {
set := newStatefulSet(4)
*set.Spec.Replicas = 2
set.Spec.PersistentVolumeClaimRetentionPolicy.WhenScaled = scaledownPolicy
set.Spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted = deletionPolicy
ssc, _, om, _ := newFakeStatefulSetController(set)
om.setsIndexer.Add(set)
pods := []*v1.Pod{}
pods = append(pods, newStatefulSetPod(set, 0))
// pod1 is orphaned
pods = append(pods, newStatefulSetPod(set, 1))
pods[1].OwnerReferences = nil
// pod2 is owned but has wrong name.
pods = append(pods, newStatefulSetPod(set, 2))
pods[2].Name = "x" + pods[2].Name
ssc.kubeClient.(*fake.Clientset).PrependReactor("patch", "pods", func(action core.Action) (bool, runtime.Object, error) {
patch := action.(core.PatchAction).GetPatch()
target := action.(core.PatchAction).GetName()
var pod *v1.Pod
for _, p := range pods {
if p.Name == target {
pod = p
break
}
}
if pod == nil {
t.Fatalf("Can't find patch target %s", target)
}
original, err := json.Marshal(pod)
if err != nil {
t.Fatalf("failed to marshal original pod %s: %v", pod.Name, err)
}
updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Pod{})
if err != nil {
t.Fatalf("failed to apply strategic merge patch %q on node %s: %v", patch, pod.Name, err)
}
if err := json.Unmarshal(updated, pod); err != nil {
t.Fatalf("failed to unmarshal updated pod %s: %v", pod.Name, err)
}
return true, pod, nil
})
for _, pod := range pods {
om.podsIndexer.Add(pod)
claims := getPersistentVolumeClaims(set, pod)
for _, claim := range claims {
om.CreateClaim(&claim)
}
}
for i := range pods {
if _, err := om.setPodReady(set, i); err != nil {
t.Errorf("%d: %v", i, err)
}
if _, err := om.setPodRunning(set, i); err != nil {
t.Errorf("%d: %v", i, err)
}
}
// First sync to manage orphaned pod, then set replicas.
ssc.enqueueStatefulSet(set)
fakeWorker(ssc)
*set.Spec.Replicas = 0 // Put an ownerRef for all scale-down deleted PVCs.
ssc.enqueueStatefulSet(set)
fakeWorker(ssc)
hasNamedOwnerRef := func(claim *v1.PersistentVolumeClaim, name string) bool {
for _, ownerRef := range claim.GetOwnerReferences() {
if ownerRef.Name == name {
return true
}
}
return false
}
verifyOwnerRefs := func(claim *v1.PersistentVolumeClaim, condemned bool) {
podName := getClaimPodName(set, claim)
const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
switch {
case scaledownPolicy == retain && deletionPolicy == retain:
if hasNamedOwnerRef(claim, podName) || hasNamedOwnerRef(claim, set.Name) {
t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
}
case scaledownPolicy == retain && deletionPolicy == delete:
if hasNamedOwnerRef(claim, podName) || !hasNamedOwnerRef(claim, set.Name) {
t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
}
case scaledownPolicy == delete && deletionPolicy == retain:
if hasNamedOwnerRef(claim, podName) != condemned || hasNamedOwnerRef(claim, set.Name) {
t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
}
case scaledownPolicy == delete && deletionPolicy == delete:
if hasNamedOwnerRef(claim, podName) != condemned || !hasNamedOwnerRef(claim, set.Name) {
t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
}
}
}
claims, _ := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(labels.Everything())
if len(claims) != len(pods) {
t.Errorf("Unexpected number of claims: %d", len(claims))
}
for _, claim := range claims {
// Only the first pod and the reclaimed orphan pod should have owner refs.
switch claim.Name {
case "datadir-foo-0", "datadir-foo-1":
verifyOwnerRefs(claim, false)
case "datadir-foo-2":
if hasNamedOwnerRef(claim, getClaimPodName(set, claim)) || hasNamedOwnerRef(claim, set.Name) {
t.Errorf("unexpected ownerRefs for %s: %v", claim.Name, claim.GetOwnerReferences())
}
default:
t.Errorf("Unexpected claim %s", claim.Name)
}
}
}
policies := []apps.PersistentVolumeClaimRetentionPolicyType{
apps.RetainPersistentVolumeClaimRetentionPolicyType,
apps.DeletePersistentVolumeClaimRetentionPolicyType,
}
for _, scaledownPolicy := range policies {
for _, deletionPolicy := range policies {
testName := fmt.Sprintf("ScaleDown:%s/SetDeletion:%s", scaledownPolicy, deletionPolicy)
t.Run(testName, func(t *testing.T) { testFn(t, scaledownPolicy, deletionPolicy) })
}
}
}
func TestStaleOwnerRefOnScaleup(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
for _, policy := range []*apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
{
WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
},
{
WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
},
} {
onPolicy := func(msg string, args ...interface{}) string {
return fmt.Sprintf(fmt.Sprintf("(%s) %s", policy, msg), args...)
}
set := newStatefulSet(3)
set.Spec.PersistentVolumeClaimRetentionPolicy = policy
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf(onPolicy("Failed to turn up StatefulSet : %s", err))
}
var err error
if set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name); err != nil {
t.Errorf(onPolicy("Could not get scaled up set: %v", err))
}
if set.Status.Replicas != 3 {
t.Errorf(onPolicy("set.Status.Replicas = %v; want 3", set.Status.Replicas))
}
*set.Spec.Replicas = 2
if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf(onPolicy("Failed to scale down StatefulSet : msg, %s", err))
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
}
if set.Status.Replicas != 2 {
t.Errorf(onPolicy("Failed to scale statefulset to 2 replicas"))
}
var claim *v1.PersistentVolumeClaim
claim, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).Get("datadir-foo-2")
if err != nil {
t.Errorf(onPolicy("Could not find expected pvc datadir-foo-2"))
}
refs := claim.GetOwnerReferences()
if len(refs) != 1 {
t.Errorf(onPolicy("Expected only one refs: %v", refs))
}
// Make the pod ref stale.
for i := range refs {
if refs[i].Name == "foo-2" {
refs[i].UID = "stale"
break
}
}
claim.SetOwnerReferences(refs)
if err = om.claimsIndexer.Update(claim); err != nil {
t.Errorf(onPolicy("Could not update claim with new owner ref: %v", err))
}
*set.Spec.Replicas = 3
// Until the stale PVC goes away, the scale up should never finish. Run 10 iterations, then delete the PVC.
if err := scaleUpStatefulSetControllerBounded(set, ssc, spc, om, 10); err != nil {
t.Errorf(onPolicy("Failed attempt to scale StatefulSet back up: %v", err))
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
}
if set.Status.Replicas != 2 {
t.Errorf(onPolicy("Expected set to stay at two replicas"))
}
claim, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).Get("datadir-foo-2")
if err != nil {
t.Errorf(onPolicy("Could not find expected pvc datadir-foo-2"))
}
refs = claim.GetOwnerReferences()
if len(refs) != 1 {
t.Errorf(onPolicy("Unexpected change to condemned pvc ownerRefs: %v", refs))
}
foundPodRef := false
for i := range refs {
if refs[i].UID == "stale" {
foundPodRef = true
break
}
}
if !foundPodRef {
t.Errorf(onPolicy("Claim ref unexpectedly changed: %v", refs))
}
if err = om.claimsIndexer.Delete(claim); err != nil {
t.Errorf(onPolicy("Could not delete stale pvc: %v", err))
}
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
t.Errorf(onPolicy("Failed to scale StatefulSet back up: %v", err))
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
}
if set.Status.Replicas != 3 {
t.Errorf(onPolicy("Failed to scale set back up once PVC was deleted"))
}
}
}
func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) {
client := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
fpc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets(), informerFactory.Apps().V1().ControllerRevisions())
om := newFakeObjectManager(informerFactory)
spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
ssc := NewStatefulSetController(
informerFactory.Core().V1().Pods(),
@ -667,9 +920,9 @@ func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSe
ssc.podListerSynced = alwaysReady
ssc.setListerSynced = alwaysReady
recorder := record.NewFakeRecorder(10)
ssc.control = NewDefaultStatefulSetControl(fpc, ssu, ssh, recorder)
ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh, recorder)
return ssc, fpc, ssh
return ssc, spc, om, ssh
}
func fakeWorker(ssc *StatefulSetController) {
@ -687,21 +940,27 @@ func getPodAtOrdinal(pods []*v1.Pod, ordinal int) *v1.Pod {
return pods[ordinal]
}
func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *fakeStatefulPodControl) error {
spc.setsIndexer.Add(set)
func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
return scaleUpStatefulSetControllerBounded(set, ssc, spc, om, -1)
}
func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager, maxIterations int) error {
om.setsIndexer.Add(set)
ssc.enqueueStatefulSet(set)
fakeWorker(ssc)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return err
}
for set.Status.ReadyReplicas < *set.Spec.Replicas {
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
iterations := 0
for (maxIterations < 0 || iterations < maxIterations) && set.Status.ReadyReplicas < *set.Spec.Replicas {
iterations++
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
return err
}
ord := len(pods) - 1
if pods, err = spc.setPodPending(set, ord); err != nil {
if pods, err = om.setPodPending(set, ord); err != nil {
return err
}
pod := getPodAtOrdinal(pods, ord)
@ -709,7 +968,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl
fakeWorker(ssc)
pod = getPodAtOrdinal(pods, ord)
prev := *pod
if pods, err = spc.setPodRunning(set, ord); err != nil {
if pods, err = om.setPodRunning(set, ord); err != nil {
return err
}
pod = getPodAtOrdinal(pods, ord)
@ -717,31 +976,31 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl
fakeWorker(ssc)
pod = getPodAtOrdinal(pods, ord)
prev = *pod
if pods, err = spc.setPodReady(set, ord); err != nil {
if pods, err = om.setPodReady(set, ord); err != nil {
return err
}
pod = getPodAtOrdinal(pods, ord)
ssc.updatePod(&prev, pod)
fakeWorker(ssc)
if err := assertMonotonicInvariants(set, spc); err != nil {
if err := assertMonotonicInvariants(set, om); err != nil {
return err
}
obj, _, err := spc.setsIndexer.Get(set)
obj, _, err := om.setsIndexer.Get(set)
if err != nil {
return err
}
set = obj.(*apps.StatefulSet)
}
return assertMonotonicInvariants(set, spc)
return assertMonotonicInvariants(set, om)
}
func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *fakeStatefulPodControl) error {
func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return err
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
return err
}
@ -749,10 +1008,10 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
pod := getPodAtOrdinal(pods, ord)
prev := *pod
fakeResourceVersion(set)
spc.setsIndexer.Add(set)
om.setsIndexer.Add(set)
ssc.enqueueStatefulSet(set)
fakeWorker(ssc)
pods, err = spc.addTerminatingPod(set, ord)
pods, err = om.addTerminatingPod(set, ord)
if err != nil {
return err
}
@ -763,13 +1022,13 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
ssc.deletePod(pod)
fakeWorker(ssc)
for set.Status.Replicas > *set.Spec.Replicas {
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
return err
}
ord := len(pods)
pods, err = spc.addTerminatingPod(set, ord)
pods, err = om.addTerminatingPod(set, ord)
if err != nil {
return err
}
@ -779,14 +1038,14 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
spc.DeleteStatefulPod(set, pod)
ssc.deletePod(pod)
fakeWorker(ssc)
obj, _, err := spc.setsIndexer.Get(set)
obj, _, err := om.setsIndexer.Get(set)
if err != nil {
return err
}
set = obj.(*apps.StatefulSet)
}
return assertMonotonicInvariants(set, spc)
return assertMonotonicInvariants(set, om)
}
func rawTemplate(template *v1.PodTemplateSpec) runtime.RawExtension {

View File

@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"reflect"
"regexp"
"sort"
"strconv"
"testing"
@ -27,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
apps "k8s.io/api/apps/v1"
@ -35,6 +37,28 @@ import (
"k8s.io/kubernetes/pkg/controller/history"
)
// noopRecorder is an EventRecorder that does nothing. record.FakeRecorder has a fixed
// buffer size, which causes tests to hang if that buffer's exceeded.
type noopRecorder struct{}
func (r *noopRecorder) Event(object runtime.Object, eventtype, reason, message string) {}
func (r *noopRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
}
func (r *noopRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
}
// getClaimPodName gets the name of the Pod associated with the Claim, or an empty string if this doesn't look matching.
func getClaimPodName(set *apps.StatefulSet, claim *v1.PersistentVolumeClaim) string {
podName := ""
statefulClaimRegex := regexp.MustCompile(fmt.Sprintf(".*-(%s-[0-9]+)$", set.Name))
matches := statefulClaimRegex.FindStringSubmatch(claim.Name)
if len(matches) != 2 {
return podName
}
return matches[1]
}
func TestGetParentNameAndOrdinal(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 1)
@ -51,6 +75,28 @@ func TestGetParentNameAndOrdinal(t *testing.T) {
}
}
func TestGetClaimPodName(t *testing.T) {
set := apps.StatefulSet{}
set.Name = "my-set"
claim := v1.PersistentVolumeClaim{}
claim.Name = "volume-my-set-2"
if pod := getClaimPodName(&set, &claim); pod != "my-set-2" {
t.Errorf("Expected my-set-2 found %s", pod)
}
claim.Name = "long-volume-my-set-20"
if pod := getClaimPodName(&set, &claim); pod != "my-set-20" {
t.Errorf("Expected my-set-20 found %s", pod)
}
claim.Name = "volume-2-my-set"
if pod := getClaimPodName(&set, &claim); pod != "" {
t.Errorf("Expected empty string found %s", pod)
}
claim.Name = "volume-pod-2"
if pod := getClaimPodName(&set, &claim); pod != "" {
t.Errorf("Expected empty string found %s", pod)
}
}
func TestIsMemberOf(t *testing.T) {
set := newStatefulSet(3)
set2 := newStatefulSet(3)
@ -180,6 +226,350 @@ func TestUpdateStorage(t *testing.T) {
}
}
func TestGetPersistentVolumeClaimRetentionPolicy(t *testing.T) {
retainPolicy := apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
scaledownPolicy := apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
set := apps.StatefulSet{}
set.Spec.PersistentVolumeClaimRetentionPolicy = &retainPolicy
got := getPersistentVolumeClaimRetentionPolicy(&set)
if got.WhenScaled != apps.RetainPersistentVolumeClaimRetentionPolicyType || got.WhenDeleted != apps.RetainPersistentVolumeClaimRetentionPolicyType {
t.Errorf("Expected retain policy")
}
set.Spec.PersistentVolumeClaimRetentionPolicy = &scaledownPolicy
got = getPersistentVolumeClaimRetentionPolicy(&set)
if got.WhenScaled != apps.DeletePersistentVolumeClaimRetentionPolicyType || got.WhenDeleted != apps.RetainPersistentVolumeClaimRetentionPolicyType {
t.Errorf("Expected scaledown policy")
}
}
func TestClaimOwnerMatchesSetAndPod(t *testing.T) {
testCases := []struct {
name string
scaleDownPolicy apps.PersistentVolumeClaimRetentionPolicyType
setDeletePolicy apps.PersistentVolumeClaimRetentionPolicyType
needsPodRef bool
needsSetRef bool
replicas int32
ordinal int
}{
{
name: "retain",
scaleDownPolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
needsPodRef: false,
needsSetRef: false,
},
{
name: "on SS delete",
scaleDownPolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
needsPodRef: false,
needsSetRef: true,
},
{
name: "on scaledown only, condemned",
scaleDownPolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
needsPodRef: true,
needsSetRef: false,
replicas: 2,
ordinal: 2,
},
{
name: "on scaledown only, remains",
scaleDownPolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
needsPodRef: false,
needsSetRef: false,
replicas: 2,
ordinal: 1,
},
{
name: "on both, condemned",
scaleDownPolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
needsPodRef: true,
needsSetRef: false,
replicas: 2,
ordinal: 2,
},
{
name: "on both, remains",
scaleDownPolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
needsPodRef: false,
needsSetRef: true,
replicas: 2,
ordinal: 1,
},
}
for _, tc := range testCases {
for _, useOtherRefs := range []bool{false, true} {
for _, setPodRef := range []bool{false, true} {
for _, setSetRef := range []bool{false, true} {
claim := v1.PersistentVolumeClaim{}
claim.Name = "target-claim"
pod := v1.Pod{}
pod.Name = fmt.Sprintf("pod-%d", tc.ordinal)
pod.GetObjectMeta().SetUID("pod-123")
set := apps.StatefulSet{}
set.Name = "stateful-set"
set.GetObjectMeta().SetUID("ss-456")
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: tc.scaleDownPolicy,
WhenDeleted: tc.setDeletePolicy,
}
set.Spec.Replicas = &tc.replicas
if setPodRef {
setOwnerRef(&claim, &pod, &pod.TypeMeta)
}
if setSetRef {
setOwnerRef(&claim, &set, &set.TypeMeta)
}
if useOtherRefs {
randomObject1 := v1.Pod{}
randomObject1.Name = "rand1"
randomObject1.GetObjectMeta().SetUID("rand1-abc")
randomObject2 := v1.Pod{}
randomObject2.Name = "rand2"
randomObject2.GetObjectMeta().SetUID("rand2-def")
setOwnerRef(&claim, &randomObject1, &randomObject1.TypeMeta)
setOwnerRef(&claim, &randomObject2, &randomObject2.TypeMeta)
}
shouldMatch := setPodRef == tc.needsPodRef && setSetRef == tc.needsSetRef
if claimOwnerMatchesSetAndPod(&claim, &set, &pod) != shouldMatch {
t.Errorf("Bad match for %s with pod=%v,set=%v,others=%v", tc.name, setPodRef, setSetRef, useOtherRefs)
}
}
}
}
}
}
func TestUpdateClaimOwnerRefForSetAndPod(t *testing.T) {
testCases := []struct {
name string
scaleDownPolicy apps.PersistentVolumeClaimRetentionPolicyType
setDeletePolicy apps.PersistentVolumeClaimRetentionPolicyType
condemned bool
needsPodRef bool
needsSetRef bool
}{
{
name: "retain",
scaleDownPolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
condemned: false,
needsPodRef: false,
needsSetRef: false,
},
{
name: "delete with set",
scaleDownPolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
condemned: false,
needsPodRef: false,
needsSetRef: true,
},
{
name: "delete with scaledown, not condemned",
scaleDownPolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
condemned: false,
needsPodRef: false,
needsSetRef: false,
},
{
name: "delete on scaledown, condemned",
scaleDownPolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.RetainPersistentVolumeClaimRetentionPolicyType,
condemned: true,
needsPodRef: true,
needsSetRef: false,
},
{
name: "delete on both, not condemned",
scaleDownPolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
condemned: false,
needsPodRef: false,
needsSetRef: true,
},
{
name: "delete on both, condemned",
scaleDownPolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
setDeletePolicy: apps.DeletePersistentVolumeClaimRetentionPolicyType,
condemned: true,
needsPodRef: true,
needsSetRef: false,
},
}
for _, tc := range testCases {
for _, hasPodRef := range []bool{true, false} {
for _, hasSetRef := range []bool{true, false} {
set := apps.StatefulSet{}
set.Name = "ss"
numReplicas := int32(5)
set.Spec.Replicas = &numReplicas
set.SetUID("ss-123")
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: tc.scaleDownPolicy,
WhenDeleted: tc.setDeletePolicy,
}
pod := v1.Pod{}
if tc.condemned {
pod.Name = "pod-8"
} else {
pod.Name = "pod-1"
}
pod.SetUID("pod-456")
claim := v1.PersistentVolumeClaim{}
if hasPodRef {
setOwnerRef(&claim, &pod, &pod.TypeMeta)
}
if hasSetRef {
setOwnerRef(&claim, &set, &set.TypeMeta)
}
needsUpdate := hasPodRef != tc.needsPodRef || hasSetRef != tc.needsSetRef
shouldUpdate := updateClaimOwnerRefForSetAndPod(&claim, &set, &pod)
if shouldUpdate != needsUpdate {
t.Errorf("Bad update for %s hasPodRef=%v hasSetRef=%v", tc.name, hasPodRef, hasSetRef)
}
if hasOwnerRef(&claim, &pod) != tc.needsPodRef {
t.Errorf("Bad pod ref for %s hasPodRef=%v hasSetRef=%v", tc.name, hasPodRef, hasSetRef)
}
if hasOwnerRef(&claim, &set) != tc.needsSetRef {
t.Errorf("Bad set ref for %s hasPodRef=%v hasSetRef=%v", tc.name, hasPodRef, hasSetRef)
}
}
}
}
}
func TestHasOwnerRef(t *testing.T) {
target := v1.Pod{}
target.SetOwnerReferences([]metav1.OwnerReference{
{UID: "123"}, {UID: "456"}})
ownerA := v1.Pod{}
ownerA.GetObjectMeta().SetUID("123")
ownerB := v1.Pod{}
ownerB.GetObjectMeta().SetUID("789")
if !hasOwnerRef(&target, &ownerA) {
t.Error("Missing owner")
}
if hasOwnerRef(&target, &ownerB) {
t.Error("Unexpected owner")
}
}
func TestHasStaleOwnerRef(t *testing.T) {
target := v1.Pod{}
target.SetOwnerReferences([]metav1.OwnerReference{
{Name: "bob", UID: "123"}, {Name: "shirley", UID: "456"}})
ownerA := v1.Pod{}
ownerA.SetUID("123")
ownerA.Name = "bob"
ownerB := v1.Pod{}
ownerB.Name = "shirley"
ownerB.SetUID("789")
ownerC := v1.Pod{}
ownerC.Name = "yvonne"
ownerC.SetUID("345")
if hasStaleOwnerRef(&target, &ownerA) {
t.Error("ownerA should not be stale")
}
if !hasStaleOwnerRef(&target, &ownerB) {
t.Error("ownerB should be stale")
}
if hasStaleOwnerRef(&target, &ownerC) {
t.Error("ownerC should not be stale")
}
}
func TestSetOwnerRef(t *testing.T) {
target := v1.Pod{}
ownerA := v1.Pod{}
ownerA.Name = "A"
ownerA.GetObjectMeta().SetUID("ABC")
if setOwnerRef(&target, &ownerA, &ownerA.TypeMeta) != true {
t.Errorf("Unexpected lack of update")
}
ownerRefs := target.GetObjectMeta().GetOwnerReferences()
if len(ownerRefs) != 1 {
t.Errorf("Unexpected owner ref count: %d", len(ownerRefs))
}
if ownerRefs[0].UID != "ABC" {
t.Errorf("Unexpected owner UID %v", ownerRefs[0].UID)
}
if setOwnerRef(&target, &ownerA, &ownerA.TypeMeta) != false {
t.Errorf("Unexpected update")
}
if len(target.GetObjectMeta().GetOwnerReferences()) != 1 {
t.Error("Unexpected duplicate reference")
}
ownerB := v1.Pod{}
ownerB.Name = "B"
ownerB.GetObjectMeta().SetUID("BCD")
if setOwnerRef(&target, &ownerB, &ownerB.TypeMeta) != true {
t.Error("Unexpected lack of second update")
}
ownerRefs = target.GetObjectMeta().GetOwnerReferences()
if len(ownerRefs) != 2 {
t.Errorf("Unexpected owner ref count: %d", len(ownerRefs))
}
if ownerRefs[0].UID != "ABC" || ownerRefs[1].UID != "BCD" {
t.Errorf("Bad second ownerRefs: %v", ownerRefs)
}
}
func TestRemoveOwnerRef(t *testing.T) {
target := v1.Pod{}
ownerA := v1.Pod{}
ownerA.Name = "A"
ownerA.GetObjectMeta().SetUID("ABC")
if removeOwnerRef(&target, &ownerA) != false {
t.Error("Unexpected update on empty remove")
}
setOwnerRef(&target, &ownerA, &ownerA.TypeMeta)
if removeOwnerRef(&target, &ownerA) != true {
t.Error("Unexpected lack of update")
}
if len(target.GetObjectMeta().GetOwnerReferences()) != 0 {
t.Error("Unexpected owner reference remains")
}
ownerB := v1.Pod{}
ownerB.Name = "B"
ownerB.GetObjectMeta().SetUID("BCD")
setOwnerRef(&target, &ownerA, &ownerA.TypeMeta)
if removeOwnerRef(&target, &ownerB) != false {
t.Error("Unexpected update for mismatched owner")
}
if len(target.GetObjectMeta().GetOwnerReferences()) != 1 {
t.Error("Missing ref after no-op remove")
}
setOwnerRef(&target, &ownerB, &ownerB.TypeMeta)
if removeOwnerRef(&target, &ownerA) != true {
t.Error("Missing update for second remove")
}
ownerRefs := target.GetObjectMeta().GetOwnerReferences()
if len(ownerRefs) != 1 {
t.Error("Extra ref after second remove")
}
if ownerRefs[0].UID != "BCD" {
t.Error("Bad UID after second remove")
}
}
func TestIsRunningAndReady(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 1)
@ -387,7 +777,8 @@ func newPod() *v1.Pod {
func newPVC(name string) v1.PersistentVolumeClaim {
return v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Name: name,
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
@ -452,6 +843,10 @@ func newStatefulSetWithVolumes(replicas int, name string, petMounts []v1.VolumeM
VolumeClaimTemplates: claims,
ServiceName: "governingsvc",
UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
PersistentVolumeClaimRetentionPolicy: &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
},
RevisionHistoryLimit: func() *int32 {
limit := int32(2)
return &limit
@ -499,6 +894,10 @@ func newStatefulSetWithLabels(replicas int, name string, uid types.UID, labels m
MatchExpressions: testMatchExpressions,
},
Replicas: func() *int32 { i := int32(replicas); return &i }(),
PersistentVolumeClaimRetentionPolicy: &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
@ -525,7 +924,7 @@ func newStatefulSetWithLabels(replicas int, name string, uid types.UID, labels m
},
VolumeClaimTemplates: []v1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{Name: "datadir"},
ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "datadir"},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{

View File

@ -20,6 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"
@ -1142,6 +1144,7 @@ var _ = SIGDescribe("StatefulSet", func() {
appTester.run()
})
})
// Make sure minReadySeconds is honored
// Don't mark it as conformance yet
ginkgo.It("MinReadySeconds should be honored when enabled", func() {
@ -1202,6 +1205,144 @@ var _ = SIGDescribe("StatefulSet", func() {
framework.Failf("invalid number of availableReplicas: expected=%v received=%v", 2, out)
}
})
ginkgo.Describe("Non-retain StatefulSetPersistentVolumeClaimPolicy [Feature:StatefulSetAutoDeletePVC]", func() {
ssName := "ss"
labels := map[string]string{
"foo": "bar",
"baz": "blah",
}
headlessSvcName := "test"
var statefulPodMounts, podMounts []v1.VolumeMount
var ss *appsv1.StatefulSet
ginkgo.BeforeEach(func() {
statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
podMounts = []v1.VolumeMount{{Name: "home", MountPath: "/home"}}
ss = e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
ginkgo.By("Creating service " + headlessSvcName + " in namespace " + ns)
headlessService := e2eservice.CreateServiceSpec(headlessSvcName, "", true, labels)
_, err := c.CoreV1().Services(ns).Create(context.TODO(), headlessService, metav1.CreateOptions{})
framework.ExpectNoError(err)
})
ginkgo.AfterEach(func() {
if ginkgo.CurrentGinkgoTestDescription().Failed {
framework.DumpDebugInfo(c, ns)
}
framework.Logf("Deleting all statefulset in ns %v", ns)
e2estatefulset.DeleteAllStatefulSets(c, ns)
})
ginkgo.It("should delete PVCs with a WhenDeleted policy", func() {
e2epv.SkipIfNoDefaultStorageClass(c)
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 3
ss.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appsv1.DeletePersistentVolumeClaimRetentionPolicyType,
}
_, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Confirming all 3 PVCs exist with their owner refs")
err = verifyStatefulSetPVCsExistWithOwnerRefs(c, ss, []int{0, 1, 2}, true, false)
framework.ExpectNoError(err)
ginkgo.By("Deleting stateful set " + ss.Name)
err = c.AppsV1().StatefulSets(ns).Delete(context.TODO(), ss.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)
ginkgo.By("Verifying PVCs deleted")
err = verifyStatefulSetPVCsExist(c, ss, []int{})
framework.ExpectNoError(err)
})
ginkgo.It("should delete PVCs with a OnScaledown policy", func() {
e2epv.SkipIfNoDefaultStorageClass(c)
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 3
ss.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: appsv1.DeletePersistentVolumeClaimRetentionPolicyType,
}
_, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Confirming all 3 PVCs exist")
err = verifyStatefulSetPVCsExist(c, ss, []int{0, 1, 2})
framework.ExpectNoError(err)
ginkgo.By("Scaling stateful set " + ss.Name + " to one replica")
ss, err = e2estatefulset.Scale(c, ss, 1)
framework.ExpectNoError(err)
ginkgo.By("Verifying all but one PVC deleted")
err = verifyStatefulSetPVCsExist(c, ss, []int{0})
framework.ExpectNoError(err)
})
ginkgo.It("should delete PVCs after adopting pod (WhenDeleted)", func() {
e2epv.SkipIfNoDefaultStorageClass(c)
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 3
ss.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenDeleted: appsv1.DeletePersistentVolumeClaimRetentionPolicyType,
}
_, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Confirming all 3 PVCs exist with their owner refs")
err = verifyStatefulSetPVCsExistWithOwnerRefs(c, ss, []int{0, 1, 2}, true, false)
framework.ExpectNoError(err)
ginkgo.By("Orphaning the 3rd pod")
patch, err := json.Marshal(metav1.ObjectMeta{
OwnerReferences: []metav1.OwnerReference{},
})
framework.ExpectNoError(err, "Could not Marshal JSON for patch payload")
_, err = c.CoreV1().Pods(ns).Patch(context.TODO(), fmt.Sprintf("%s-2", ss.Name), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "")
framework.ExpectNoError(err, "Could not patch payload")
ginkgo.By("Deleting stateful set " + ss.Name)
err = c.AppsV1().StatefulSets(ns).Delete(context.TODO(), ss.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)
ginkgo.By("Verifying PVCs deleted")
err = verifyStatefulSetPVCsExist(c, ss, []int{})
framework.ExpectNoError(err)
})
ginkgo.It("should delete PVCs after adopting pod (WhenScaled) [Feature:StatefulSetAutoDeletePVC]", func() {
e2epv.SkipIfNoDefaultStorageClass(c)
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
*(ss.Spec.Replicas) = 3
ss.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: appsv1.DeletePersistentVolumeClaimRetentionPolicyType,
}
_, err := c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Confirming all 3 PVCs exist")
err = verifyStatefulSetPVCsExist(c, ss, []int{0, 1, 2})
framework.ExpectNoError(err)
ginkgo.By("Orphaning the 3rd pod")
patch, err := json.Marshal(metav1.ObjectMeta{
OwnerReferences: []metav1.OwnerReference{},
})
framework.ExpectNoError(err, "Could not Marshal JSON for patch payload")
_, err = c.CoreV1().Pods(ns).Patch(context.TODO(), fmt.Sprintf("%s-2", ss.Name), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "")
framework.ExpectNoError(err, "Could not patch payload")
ginkgo.By("Scaling stateful set " + ss.Name + " to one replica")
ss, err = e2estatefulset.Scale(c, ss, 1)
framework.ExpectNoError(err)
ginkgo.By("Verifying all but one PVC deleted")
err = verifyStatefulSetPVCsExist(c, ss, []int{0})
framework.ExpectNoError(err)
})
})
})
func kubectlExecWithRetries(ns string, args ...string) (out string) {
@ -1645,3 +1786,111 @@ func getStatefulSet(c clientset.Interface, namespace, name string) *appsv1.State
}
return ss
}
// verifyStatefulSetPVCsExist confirms that exactly the PVCs for ss with the specified ids exist. This polls until the situation occurs, an error happens, or until timeout (in the latter case an error is also returned). Beware that this cannot tell if a PVC will be deleted at some point in the future, so if used to confirm that no PVCs are deleted, the caller should wait for some event giving the PVCs a reasonable chance to be deleted, before calling this function.
func verifyStatefulSetPVCsExist(c clientset.Interface, ss *appsv1.StatefulSet, claimIds []int) error {
idSet := map[int]struct{}{}
for _, id := range claimIds {
idSet[id] = struct{}{}
}
return wait.PollImmediate(e2estatefulset.StatefulSetPoll, e2estatefulset.StatefulSetTimeout, func() (bool, error) {
pvcList, err := c.CoreV1().PersistentVolumeClaims(ss.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()})
if err != nil {
framework.Logf("WARNING: Failed to list pvcs for verification, retrying: %v", err)
return false, nil
}
for _, claim := range ss.Spec.VolumeClaimTemplates {
pvcNameRE := regexp.MustCompile(fmt.Sprintf("^%s-%s-([0-9]+)$", claim.Name, ss.Name))
seenPVCs := map[int]struct{}{}
for _, pvc := range pvcList.Items {
matches := pvcNameRE.FindStringSubmatch(pvc.Name)
if len(matches) != 2 {
continue
}
ordinal, err := strconv.ParseInt(matches[1], 10, 32)
if err != nil {
framework.Logf("ERROR: bad pvc name %s (%v)", pvc.Name, err)
return false, err
}
if _, found := idSet[int(ordinal)]; !found {
return false, nil // Retry until the PVCs are consistent.
} else {
seenPVCs[int(ordinal)] = struct{}{}
}
}
if len(seenPVCs) != len(idSet) {
framework.Logf("Found %d of %d PVCs", len(seenPVCs), len(idSet))
return false, nil // Retry until the PVCs are consistent.
}
}
return true, nil
})
}
// verifyStatefulSetPVCsExistWithOwnerRefs works as verifyStatefulSetPVCsExist, but also waits for the ownerRefs to match.
func verifyStatefulSetPVCsExistWithOwnerRefs(c clientset.Interface, ss *appsv1.StatefulSet, claimIndicies []int, wantSetRef, wantPodRef bool) error {
indexSet := map[int]struct{}{}
for _, id := range claimIndicies {
indexSet[id] = struct{}{}
}
set := getStatefulSet(c, ss.Namespace, ss.Name)
setUID := set.GetUID()
if setUID == "" {
framework.Failf("Statefulset %s mising UID", ss.Name)
}
return wait.PollImmediate(e2estatefulset.StatefulSetPoll, e2estatefulset.StatefulSetTimeout, func() (bool, error) {
pvcList, err := c.CoreV1().PersistentVolumeClaims(ss.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()})
if err != nil {
framework.Logf("WARNING: Failed to list pvcs for verification, retrying: %v", err)
return false, nil
}
for _, claim := range ss.Spec.VolumeClaimTemplates {
pvcNameRE := regexp.MustCompile(fmt.Sprintf("^%s-%s-([0-9]+)$", claim.Name, ss.Name))
seenPVCs := map[int]struct{}{}
for _, pvc := range pvcList.Items {
matches := pvcNameRE.FindStringSubmatch(pvc.Name)
if len(matches) != 2 {
continue
}
ordinal, err := strconv.ParseInt(matches[1], 10, 32)
if err != nil {
framework.Logf("ERROR: bad pvc name %s (%v)", pvc.Name, err)
return false, err
}
if _, found := indexSet[int(ordinal)]; !found {
framework.Logf("Unexpected, retrying")
return false, nil // Retry until the PVCs are consistent.
}
var foundSetRef, foundPodRef bool
for _, ref := range pvc.GetOwnerReferences() {
if ref.Kind == "StatefulSet" && ref.UID == setUID {
foundSetRef = true
}
if ref.Kind == "Pod" {
podName := fmt.Sprintf("%s-%d", ss.Name, ordinal)
pod, err := c.CoreV1().Pods(ss.Namespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
framework.Logf("Pod %s not found, retrying (%v)", podName, err)
return false, nil
}
podUID := pod.GetUID()
if podUID == "" {
framework.Failf("Pod %s is missing UID", pod.Name)
}
if ref.UID == podUID {
foundPodRef = true
}
}
}
if foundSetRef == wantSetRef && foundPodRef == wantPodRef {
seenPVCs[int(ordinal)] = struct{}{}
}
}
if len(seenPVCs) != len(indexSet) {
framework.Logf("Only %d PVCs, retrying", len(seenPVCs))
return false, nil // Retry until the PVCs are consistent.
}
}
return true, nil
})
}