Migrated the StatefulSet controller (within `kube-controller-manager) to use [contextual logging](https://k8s.io/docs/concepts/cluster-administration/system-logs/#contextual-logging)

This commit is contained in:
ZhangKe10140699 2022-11-11 13:33:41 +08:00
parent f99c351992
commit a239b9986b
8 changed files with 203 additions and 141 deletions

View File

@ -50,7 +50,9 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC
}
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "statefulset"))
go statefulset.NewStatefulSetController(
ctx,
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Apps().V1().StatefulSets(),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),

View File

@ -126,7 +126,7 @@ func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.
}
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// Set PVC policy as much as is possible at this point.
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
@ -135,7 +135,7 @@ func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.
return err
}
func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
func (spc *StatefulPodControl) UpdateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
attemptedUpdate := false
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// assume the Pod is consistent
@ -158,11 +158,11 @@ func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// if the Pod's PVCs are not consistent with the StatefulSet's PVC deletion policy, update the PVC
// and dirty the pod.
if match, err := spc.ClaimsMatchRetentionPolicy(set, pod); err != nil {
if match, err := spc.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
} else if !match {
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
@ -207,7 +207,7 @@ func (spc *StatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.
// ClaimsMatchRetentionPolicy returns false if the PVCs for pod are not consistent with set's PVC deletion policy.
// An error is returned if something is not consistent. This is expected if the pod is being otherwise updated,
// but a problem otherwise (see usage of this method in UpdateStatefulPod).
func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
ordinal := getOrdinal(pod)
templates := set.Spec.VolumeClaimTemplates
for i := range templates {
@ -215,7 +215,7 @@ func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet,
claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
switch {
case apierrors.IsNotFound(err):
klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration", claimName)
klog.FromContext(ctx).V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim))
case err != nil:
return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name)
default:
@ -228,7 +228,8 @@ func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet,
}
// UpdatePodClaimForRetentionPolicy updates the PVCs used by pod to match the PVC deletion policy of set.
func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) error {
func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
logger := klog.FromContext(ctx)
ordinal := getOrdinal(pod)
templates := set.Spec.VolumeClaimTemplates
for i := range templates {
@ -236,7 +237,7 @@ func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(set *apps.Statef
claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
switch {
case apierrors.IsNotFound(err):
klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration.")
logger.V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim))
case err != nil:
return fmt.Errorf("Could not retrieve claim %s not found for %s when checking PVC deletion policy: %w", claimName, pod.Name, err)
default:
@ -316,14 +317,14 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulS
}
// createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy
func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// Set PVC policy as much as is possible at this point.
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}

View File

@ -25,7 +25,7 @@ import (
"time"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -37,6 +37,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/features"
@ -241,6 +242,7 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) {
}
func TestStatefulPodControlNoOpUpdate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
@ -257,7 +259,7 @@ func TestStatefulPodControlNoOpUpdate(t *testing.T) {
t.Error("no-op update should not make any client invocation")
return true, nil, apierrors.NewInternalError(errors.New("if we are here we have a problem"))
})
if err := control.UpdateStatefulPod(set, pod); err != nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
t.Errorf("Error returned on no-op update error: %s", err)
}
events := collectEvents(recorder.Events)
@ -267,6 +269,7 @@ func TestStatefulPodControlNoOpUpdate(t *testing.T) {
}
func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
@ -281,7 +284,7 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
return true, update.GetObject(), nil
})
pod.Name = "goo-0"
if err := control.UpdateStatefulPod(set, pod); err != nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err)
}
events := collectEvents(recorder.Events)
@ -296,6 +299,7 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
}
func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
@ -313,7 +317,7 @@ func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
pod.Name = "goo-0"
if err := control.UpdateStatefulPod(set, pod); err == nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err == nil {
t.Error("Failed update does not generate an error")
}
events := collectEvents(recorder.Events)
@ -328,6 +332,7 @@ func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
}
func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
@ -357,7 +362,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
updated = update.GetObject().(*v1.Pod)
return true, update.GetObject(), nil
})
if err := control.UpdateStatefulPod(set, pod); err != nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err)
}
events := collectEvents(recorder.Events)
@ -375,6 +380,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
}
func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
@ -397,7 +403,7 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
if err := control.UpdateStatefulPod(set, pod); err == nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err == nil {
t.Error("Failed Pod storage update did not return an error")
}
events := collectEvents(recorder.Events)
@ -412,6 +418,7 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
}
func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
@ -440,7 +447,7 @@ func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
})
pod.Labels[apps.StatefulSetPodNameLabel] = "goo-0"
if err := control.UpdateStatefulPod(set, pod); err != nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err)
}
events := collectEvents(recorder.Events)
@ -497,6 +504,7 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) {
func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) {
// The claimOwnerMatchesSetAndPod is tested exhaustively in stateful_set_utils_test; this
// test is for the wiring to the method tested there.
_, ctx := ktesting.NewTestContext(t)
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
@ -512,7 +520,7 @@ func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) {
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
if matches, err := control.ClaimsMatchRetentionPolicy(set, pod); err != nil {
if matches, err := control.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (retain): %v", err)
} else if !matches {
t.Error("Unexpected non-match for ClaimsMatchRetentionPolicy (retain)")
@ -521,7 +529,7 @@ func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) {
WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
if matches, err := control.ClaimsMatchRetentionPolicy(set, pod); err != nil {
if matches, err := control.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (set deletion): %v", err)
} else if matches {
t.Error("Unexpected match for ClaimsMatchRetentionPolicy (set deletion)")
@ -532,6 +540,7 @@ func TestStatefulPodControlUpdatePodClaimForRetentionPolicy(t *testing.T) {
// All the update conditions are tested exhaustively in stateful_set_utils_test. This
// tests the wiring from the pod control to that method.
testFn := func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
fakeClient := &fake.Clientset{}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
@ -554,7 +563,7 @@ func TestStatefulPodControlUpdatePodClaimForRetentionPolicy(t *testing.T) {
WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
}
if err := control.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
if err := control.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
t.Errorf("Unexpected error for UpdatePodClaimForRetentionPolicy (retain): %v", err)
}
expectRef := utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC)
@ -681,6 +690,7 @@ func TestPodClaimIsStale(t *testing.T) {
func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) {
testFn := func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(1)
set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
@ -704,7 +714,7 @@ func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) {
claimIndexer.Add(&claim)
}
control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
if err := control.UpdateStatefulPod(set, pod); err != nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err)
}
for k := range claims {
@ -735,6 +745,7 @@ func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) {
}
func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Only applicable when the feature gate is on; the off case is tested in TestStatefulPodControlRetainRetentionPolicyUpdate.
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
@ -766,7 +777,7 @@ func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) {
podLister := corelisters.NewPodLister(podIndexer)
claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
if err := control.UpdateStatefulPod(set, pod); err != nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
t.Errorf("Successful update returned an error: %s", err)
}
updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
@ -783,6 +794,7 @@ func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) {
}
func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
// Only applicable when the feature gate is on.
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
@ -805,7 +817,7 @@ func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) {
return true, update.GetObject(), nil
})
control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
if err := control.UpdateStatefulPod(set, pod); err != nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
t.Error("Unexpected error on pod update when PVCs are missing")
}
claims := getPersistentVolumeClaims(set, pod)
@ -818,7 +830,7 @@ func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) {
claimIndexer.Add(&claim)
}
if err := control.UpdateStatefulPod(set, pod); err != nil {
if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
t.Errorf("Expected update to succeed, saw error %v", err)
}
updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)

View File

@ -78,12 +78,14 @@ type StatefulSetController struct {
// NewStatefulSetController creates a new statefulset controller.
func NewStatefulSetController(
ctx context.Context,
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
revInformer appsinformers.ControllerRevisionInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
logger := klog.FromContext(ctx)
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
ssc := &StatefulSetController{
@ -108,11 +110,17 @@ func NewStatefulSetController(
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// lookup the statefulset and enqueue
AddFunc: ssc.addPod,
AddFunc: func(obj interface{}) {
ssc.addPod(logger, obj)
},
// lookup current and old statefulset if labels changed
UpdateFunc: ssc.updatePod,
UpdateFunc: func(oldObj, newObj interface{}) {
ssc.updatePod(logger, oldObj, newObj)
},
// lookup statefulset accounting for deletion tombstones
DeleteFunc: ssc.deletePod,
DeleteFunc: func(obj interface{}) {
ssc.deletePod(logger, obj)
},
})
ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced
@ -124,7 +132,7 @@ func NewStatefulSetController(
oldPS := old.(*apps.StatefulSet)
curPS := cur.(*apps.StatefulSet)
if oldPS.Status.Replicas != curPS.Status.Replicas {
klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
logger.V(4).Info("Observed updated replica count for StatefulSet", "statefulSet", klog.KObj(curPS), "oldReplicas", oldPS.Status.Replicas, "newReplicas", curPS.Status.Replicas)
}
ssc.enqueueStatefulSet(cur)
},
@ -149,8 +157,9 @@ func (ssc *StatefulSetController) Run(ctx context.Context, workers int) {
defer ssc.queue.ShutDown()
klog.Infof("Starting stateful set controller")
defer klog.Infof("Shutting down statefulset controller")
logger := klog.FromContext(ctx)
logger.Info("Starting stateful set controller")
defer logger.Info("Shutting down statefulset controller")
if !cache.WaitForNamedCacheSync("stateful set", ctx.Done(), ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
@ -164,13 +173,13 @@ func (ssc *StatefulSetController) Run(ctx context.Context, workers int) {
}
// addPod adds the statefulset for the pod to the sync queue
func (ssc *StatefulSetController) addPod(obj interface{}) {
func (ssc *StatefulSetController) addPod(logger klog.Logger, obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
ssc.deletePod(pod)
ssc.deletePod(logger, pod)
return
}
@ -180,7 +189,7 @@ func (ssc *StatefulSetController) addPod(obj interface{}) {
if set == nil {
return
}
klog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
logger.V(4).Info("Pod created with labels", "pod", klog.KObj(pod), "labels", pod.Labels)
ssc.enqueueStatefulSet(set)
return
}
@ -191,14 +200,14 @@ func (ssc *StatefulSetController) addPod(obj interface{}) {
if len(sets) == 0 {
return
}
klog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels)
logger.V(4).Info("Orphan Pod created with labels", "pod", klog.KObj(pod), "labels", pod.Labels)
for _, set := range sets {
ssc.enqueueStatefulSet(set)
}
}
// updatePod adds the statefulset for the current and old pods to the sync queue.
func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
@ -225,13 +234,13 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
if set == nil {
return
}
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
ssc.enqueueStatefulSet(set)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica.
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
klog.V(2).Infof("StatefulSet %s will be enqueued after %ds for availability check", set.Name, set.Spec.MinReadySeconds)
logger.V(2).Info("StatefulSet will be enqueued after minReadySeconds for availability check", "statefulSet", klog.KObj(set), "minReadySeconds", set.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
@ -246,7 +255,7 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
if len(sets) == 0 {
return
}
klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
logger.V(4).Info("Orphan Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta)
for _, set := range sets {
ssc.enqueueStatefulSet(set)
}
@ -254,7 +263,7 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
}
// deletePod enqueues the statefulset for the pod accounting for deletion tombstones.
func (ssc *StatefulSetController) deletePod(obj interface{}) {
func (ssc *StatefulSetController) deletePod(logger klog.Logger, obj interface{}) {
pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
@ -282,7 +291,7 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) {
if set == nil {
return
}
klog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
logger.V(4).Info("Pod deleted.", "pod", klog.KObj(pod), "caller", utilruntime.GetCaller())
ssc.enqueueStatefulSet(set)
}
@ -435,8 +444,9 @@ func (ssc *StatefulSetController) worker(ctx context.Context) {
// sync syncs the given statefulset.
func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
startTime := time.Now()
logger := klog.FromContext(ctx)
defer func() {
klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -445,7 +455,7 @@ func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
}
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("StatefulSet has been deleted %v", key)
logger.Info("StatefulSet has been deleted", "key", key)
return nil
}
if err != nil {
@ -474,14 +484,15 @@ func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) error {
klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
logger := klog.FromContext(ctx)
logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods))
var status *apps.StatefulSetStatus
var err error
status, err = ssc.control.UpdateStatefulSet(ctx, set, pods)
if err != nil {
return err
}
klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set))
// One more sync to handle the clock skew. This is also helping in requeuing right after status update
if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)

View File

@ -97,6 +97,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set
func (ssc *defaultStatefulSetControl) performUpdate(
ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
logger := klog.FromContext(ctx)
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
@ -112,7 +113,7 @@ func (ssc *defaultStatefulSetControl) performUpdate(
// make sure to update the latest status even if there is an error with non-nil currentStatus
statusErr := ssc.updateStatefulSetStatus(ctx, set, currentStatus)
if statusErr == nil {
klog.V(4).InfoS("Updated status", "statefulSet", klog.KObj(set),
logger.V(4).Info("Updated status", "statefulSet", klog.KObj(set),
"replicas", currentStatus.Replicas,
"readyReplicas", currentStatus.ReadyReplicas,
"currentReplicas", currentStatus.CurrentReplicas,
@ -129,7 +130,7 @@ func (ssc *defaultStatefulSetControl) performUpdate(
return currentRevision, updateRevision, currentStatus, statusErr
}
klog.V(4).InfoS("StatefulSet revisions", "statefulSet", klog.KObj(set),
logger.V(4).Info("StatefulSet revisions", "statefulSet", klog.KObj(set),
"currentRevision", currentStatus.CurrentRevision,
"updateRevision", currentStatus.UpdateRevision)
@ -284,6 +285,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
// get the current and update revisions of the set.
currentSet, err := ApplyRevision(set, currentRevision)
if err != nil {
@ -380,7 +382,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
if unhealthy > 0 {
klog.V(4).InfoS("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod))
logger.V(4).Info("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod))
}
// If the StatefulSet is being deleted, don't do anything other than updating
@ -453,14 +455,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.createMissingPersistentVolumeClaims(set, replicas[i]); err != nil {
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
return &status, err
}
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
klog.V(4).InfoS("StatefulSet is waiting for Pod to Terminate",
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return &status, nil
}
@ -468,7 +470,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Running and Ready.
if !isRunningAndReady(replicas[i]) && monotonic {
klog.V(4).InfoS("StatefulSet is waiting for Pod to be Running and Ready",
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return &status, nil
}
@ -476,7 +478,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Available.
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
klog.V(4).InfoS("StatefulSet is waiting for Pod to be Available",
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return &status, nil
}
@ -484,7 +486,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
retentionMatch := true
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
var err error
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, replicas[i])
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
if err != nil {
retentionMatch = true
@ -495,7 +497,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
// Make a deep copy so we don't mutate the shared cache
replica := replicas[i].DeepCopy()
if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
return &status, err
}
}
@ -503,10 +505,10 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// Ensure ownerRefs are set correctly for the condemned pods.
for i := range condemned {
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, condemned[i]); err != nil {
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
return &status, err
} else if !matchPolicy {
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(updateSet, condemned[i]); err != nil {
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
return &status, err
}
}
@ -521,7 +523,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
for target := len(condemned) - 1; target >= 0; target-- {
// wait for terminating pods to expire
if isTerminating(condemned[target]) {
klog.V(4).InfoS("StatefulSet is waiting for Pod to Terminate prior to scale down",
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))
// block if we are in monotonic mode
if monotonic {
@ -531,17 +533,17 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
klog.V(4).InfoS("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return &status, nil
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
if !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod {
klog.V(4).InfoS("StatefulSet is waiting for Pod to be Available prior to scale down",
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return &status, nil
}
klog.V(2).InfoS("Pod of StatefulSet is terminating for scale down",
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
@ -583,7 +585,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// delete the Pod if it is not already terminating and does not match the update revision.
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
klog.V(2).InfoS("Pod of StatefulSet is terminating for update",
logger.V(2).Info("Pod of StatefulSet is terminating for update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
if !errors.IsNotFound(err) {
@ -596,7 +598,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// wait for unhealthy Pods on update
if !isHealthy(replicas[target]) {
klog.V(4).InfoS("StatefulSet is waiting for Pod to update",
logger.V(4).Info("StatefulSet is waiting for Pod to update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
return &status, nil
}
@ -614,6 +616,7 @@ func updateStatefulSetAfterInvariantEstablished(
status apps.StatefulSetStatus,
) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
replicaCount := int(*set.Spec.Replicas)
// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
@ -644,11 +647,10 @@ func updateStatefulSetAfterInvariantEstablished(
}
if unavailablePods >= maxUnavailable {
klog.V(2).Infof("StatefulSet %s/%s found unavailablePods %v, more than or equal to allowed maxUnavailable %v",
set.Namespace,
set.Name,
unavailablePods,
maxUnavailable)
logger.V(2).Info("StatefulSet found unavailablePods, more than or equal to allowed maxUnavailable",
"statefulSet", klog.KObj(set),
"unavailablePods", unavailablePods,
"maxUnavailable", maxUnavailable)
return &status, nil
}
@ -662,10 +664,9 @@ func updateStatefulSetAfterInvariantEstablished(
// delete the Pod if it is healthy and the revision doesnt match the target
if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
// delete the Pod if it is healthy and the revision doesnt match the target
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
logger.V(2).Info("StatefulSet terminating Pod for update",
"statefulSet", klog.KObj(set),
"pod", klog.KObj(replicas[target]))
if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
if !errors.IsNotFound(err) {
return &status, err

View File

@ -39,6 +39,8 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
@ -50,8 +52,9 @@ func alwaysReady() bool { return true }
func TestStatefulSetControllerCreates(t *testing.T) {
set := newStatefulSet(3)
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
logger, ctx := ktesting.NewTestContext(t)
ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
if obj, _, err := om.setsIndexer.Get(set); err != nil {
@ -66,8 +69,9 @@ func TestStatefulSetControllerCreates(t *testing.T) {
func TestStatefulSetControllerDeletes(t *testing.T) {
set := newStatefulSet(3)
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
logger, ctx := ktesting.NewTestContext(t)
ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
if obj, _, err := om.setsIndexer.Get(set); err != nil {
@ -79,7 +83,7 @@ func TestStatefulSetControllerDeletes(t *testing.T) {
t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
}
*set.Spec.Replicas = 0
if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil {
if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn down StatefulSet : %s", err)
}
if obj, _, err := om.setsIndexer.Get(set); err != nil {
@ -94,8 +98,9 @@ func TestStatefulSetControllerDeletes(t *testing.T) {
func TestStatefulSetControllerRespectsTermination(t *testing.T) {
set := newStatefulSet(3)
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
logger, ctx := ktesting.NewTestContext(t)
ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
if obj, _, err := om.setsIndexer.Get(set); err != nil {
@ -114,7 +119,7 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) {
if err != nil {
t.Error(err)
}
ssc.syncStatefulSet(context.TODO(), set, pods)
ssc.syncStatefulSet(ctx, set, pods)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
@ -130,7 +135,7 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) {
spc.DeleteStatefulPod(set, pods[3])
spc.DeleteStatefulPod(set, pods[4])
*set.Spec.Replicas = 0
if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil {
if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn down StatefulSet : %s", err)
}
if obj, _, err := om.setsIndexer.Get(set); err != nil {
@ -144,9 +149,10 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) {
}
func TestStatefulSetControllerBlocksScaling(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
set := newStatefulSet(3)
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
if obj, _, err := om.setsIndexer.Get(set); err != nil {
@ -191,9 +197,10 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) {
}
func TestStatefulSetControllerDeletionTimestamp(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
set := newStatefulSet(3)
set.DeletionTimestamp = new(metav1.Time)
ssc, _, om, _ := newFakeStatefulSetController(set)
ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
om.setsIndexer.Add(set)
@ -215,10 +222,11 @@ func TestStatefulSetControllerDeletionTimestamp(t *testing.T) {
}
func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
set := newStatefulSet(3)
// The bare client says it IS deleted.
set.DeletionTimestamp = new(metav1.Time)
ssc, _, om, ssh := newFakeStatefulSetController(set)
ssc, _, om, ssh := newFakeStatefulSetController(ctx, set)
// The lister (cache) says it's NOT deleted.
set2 := *set
@ -279,7 +287,8 @@ func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) {
}
func TestStatefulSetControllerAddPod(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
pod1 := newStatefulSetPod(set1, 0)
@ -287,7 +296,7 @@ func TestStatefulSetControllerAddPod(t *testing.T) {
om.setsIndexer.Add(set1)
om.setsIndexer.Add(set2)
ssc.addPod(pod1)
ssc.addPod(logger, pod1)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
@ -298,7 +307,7 @@ func TestStatefulSetControllerAddPod(t *testing.T) {
}
ssc.queue.Done(key)
ssc.addPod(pod2)
ssc.addPod(logger, pod2)
key, done = ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
@ -311,7 +320,8 @@ func TestStatefulSetControllerAddPod(t *testing.T) {
}
func TestStatefulSetControllerAddPodOrphan(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
@ -325,17 +335,18 @@ func TestStatefulSetControllerAddPodOrphan(t *testing.T) {
// Make pod an orphan. Expect matching sets to be queued.
pod.OwnerReferences = nil
ssc.addPod(pod)
ssc.addPod(logger, pod)
if got, want := ssc.queue.Len(), 2; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStatefulSetControllerAddPodNoSet(t *testing.T) {
ssc, _, _, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, _, _ := newFakeStatefulSetController(ctx)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
ssc.addPod(pod)
ssc.addPod(logger, pod)
ssc.queue.ShutDown()
key, _ := ssc.queue.Get()
if key != nil {
@ -344,7 +355,8 @@ func TestStatefulSetControllerAddPodNoSet(t *testing.T) {
}
func TestStatefulSetControllerUpdatePod(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
@ -355,7 +367,7 @@ func TestStatefulSetControllerUpdatePod(t *testing.T) {
prev := *pod1
fakeResourceVersion(pod1)
ssc.updatePod(&prev, pod1)
ssc.updatePod(logger, &prev, pod1)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
@ -367,7 +379,7 @@ func TestStatefulSetControllerUpdatePod(t *testing.T) {
prev = *pod2
fakeResourceVersion(pod2)
ssc.updatePod(&prev, pod2)
ssc.updatePod(logger, &prev, pod2)
key, done = ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
@ -379,12 +391,13 @@ func TestStatefulSetControllerUpdatePod(t *testing.T) {
}
func TestStatefulSetControllerUpdatePodWithNoSet(t *testing.T) {
ssc, _, _, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, _, _ := newFakeStatefulSetController(ctx)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
prev := *pod
fakeResourceVersion(pod)
ssc.updatePod(&prev, pod)
ssc.updatePod(logger, &prev, pod)
ssc.queue.ShutDown()
key, _ := ssc.queue.Get()
if key != nil {
@ -393,11 +406,12 @@ func TestStatefulSetControllerUpdatePodWithNoSet(t *testing.T) {
}
func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
om.setsIndexer.Add(set)
ssc.updatePod(pod, pod)
ssc.updatePod(logger, pod, pod)
ssc.queue.ShutDown()
key, _ := ssc.queue.Get()
if key != nil {
@ -406,7 +420,8 @@ func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) {
}
func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
pod.OwnerReferences = nil
@ -417,14 +432,15 @@ func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) {
clone := *pod
clone.Labels = map[string]string{"foo2": "bar2"}
fakeResourceVersion(&clone)
ssc.updatePod(&clone, pod)
ssc.updatePod(logger, &clone, pod)
if got, want := ssc.queue.Len(), 2; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
@ -435,14 +451,15 @@ func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) {
clone := *pod
clone.OwnerReferences = pod2.OwnerReferences
fakeResourceVersion(&clone)
ssc.updatePod(&clone, pod)
ssc.updatePod(logger, &clone, pod)
if got, want := ssc.queue.Len(), 2; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStatefulSetControllerUpdatePodRelease(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
@ -452,14 +469,15 @@ func TestStatefulSetControllerUpdatePodRelease(t *testing.T) {
clone := *pod
clone.OwnerReferences = nil
fakeResourceVersion(&clone)
ssc.updatePod(pod, &clone)
ssc.updatePod(logger, pod, &clone)
if got, want := ssc.queue.Len(), 2; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStatefulSetControllerDeletePod(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
@ -468,7 +486,7 @@ func TestStatefulSetControllerDeletePod(t *testing.T) {
om.setsIndexer.Add(set1)
om.setsIndexer.Add(set2)
ssc.deletePod(pod1)
ssc.deletePod(logger, pod1)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
@ -478,7 +496,7 @@ func TestStatefulSetControllerDeletePod(t *testing.T) {
t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
}
ssc.deletePod(pod2)
ssc.deletePod(logger, pod2)
key, done = ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
@ -490,7 +508,8 @@ func TestStatefulSetControllerDeletePod(t *testing.T) {
}
func TestStatefulSetControllerDeletePodOrphan(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
@ -499,20 +518,21 @@ func TestStatefulSetControllerDeletePodOrphan(t *testing.T) {
om.setsIndexer.Add(set2)
pod1.OwnerReferences = nil
ssc.deletePod(pod1)
ssc.deletePod(logger, pod1)
if got, want := ssc.queue.Len(), 0; got != want {
t.Errorf("queue.Len() = %v, want %v", got, want)
}
}
func TestStatefulSetControllerDeletePodTombstone(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
logger, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
om.setsIndexer.Add(set)
tombstoneKey, _ := controller.KeyFunc(pod)
tombstone := cache.DeletedFinalStateUnknown{Key: tombstoneKey, Obj: pod}
ssc.deletePod(tombstone)
ssc.deletePod(logger, tombstone)
key, done := ssc.queue.Get()
if key == nil || done {
t.Error("failed to enqueue StatefulSet")
@ -524,7 +544,8 @@ func TestStatefulSetControllerDeletePodTombstone(t *testing.T) {
}
func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) {
ssc, _, om, _ := newFakeStatefulSetController()
_, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx)
set1 := newStatefulSet(3)
set2 := newStatefulSet(3)
set2.Name = "foo2"
@ -553,7 +574,8 @@ func TestGetPodsForStatefulSetAdopt(t *testing.T) {
pod4.OwnerReferences = nil
pod4.Name = "x" + pod4.Name
ssc, _, om, _ := newFakeStatefulSetController(set, pod1, pod2, pod3, pod4)
_, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx, set, pod1, pod2, pod3, pod4)
om.podsIndexer.Add(pod1)
om.podsIndexer.Add(pod2)
@ -595,7 +617,8 @@ func TestAdoptOrphanRevisions(t *testing.T) {
ss1Rev2.Namespace = ss1.Namespace
ss1Rev2.OwnerReferences = []metav1.OwnerReference{}
ssc, _, om, _ := newFakeStatefulSetController(ss1, ss1Rev1, ss1Rev2)
_, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx, ss1, ss1Rev1, ss1Rev2)
om.revisionsIndexer.Add(ss1Rev1)
om.revisionsIndexer.Add(ss1Rev2)
@ -621,8 +644,9 @@ func TestAdoptOrphanRevisions(t *testing.T) {
}
func TestGetPodsForStatefulSetRelease(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
set := newStatefulSet(3)
ssc, _, om, _ := newFakeStatefulSetController(set)
ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
pod1 := newStatefulSetPod(set, 1)
// pod2 is owned but has wrong name.
pod2 := newStatefulSetPod(set, 2)
@ -667,7 +691,8 @@ func TestOrphanedPodsWithPVCDeletePolicy(t *testing.T) {
*set.Spec.Replicas = 2
set.Spec.PersistentVolumeClaimRetentionPolicy.WhenScaled = scaledownPolicy
set.Spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted = deletionPolicy
ssc, _, om, _ := newFakeStatefulSetController(set)
_, ctx := ktesting.NewTestContext(t)
ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
om.setsIndexer.Add(set)
pods := []*v1.Pod{}
@ -811,8 +836,9 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) {
}
set := newStatefulSet(3)
set.Spec.PersistentVolumeClaimRetentionPolicy = policy
ssc, spc, om, _ := newFakeStatefulSetController(set)
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
logger, ctx := ktesting.NewTestContext(t)
ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf(onPolicy("Failed to turn up StatefulSet : %s", err))
}
var err error
@ -823,7 +849,7 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) {
t.Errorf(onPolicy("set.Status.Replicas = %v; want 3", set.Status.Replicas))
}
*set.Spec.Replicas = 2
if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil {
if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf(onPolicy("Failed to scale down StatefulSet : msg, %s", err))
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -857,7 +883,7 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) {
*set.Spec.Replicas = 3
// Until the stale PVC goes away, the scale up should never finish. Run 10 iterations, then delete the PVC.
if err := scaleUpStatefulSetControllerBounded(set, ssc, spc, om, 10); err != nil {
if err := scaleUpStatefulSetControllerBounded(logger, set, ssc, spc, om, 10); err != nil {
t.Errorf(onPolicy("Failed attempt to scale StatefulSet back up: %v", err))
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -890,7 +916,7 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) {
t.Errorf(onPolicy("Could not delete stale pvc: %v", err))
}
if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil {
if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
t.Errorf(onPolicy("Failed to scale StatefulSet back up: %v", err))
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -903,13 +929,14 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) {
}
}
func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) {
func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) {
client := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
om := newFakeObjectManager(informerFactory)
spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
ssc := NewStatefulSetController(
ctx,
informerFactory.Core().V1().Pods(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().PersistentVolumeClaims(),
@ -940,11 +967,11 @@ func getPodAtOrdinal(pods []*v1.Pod, ordinal int) *v1.Pod {
return pods[ordinal]
}
func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
return scaleUpStatefulSetControllerBounded(set, ssc, spc, om, -1)
func scaleUpStatefulSetController(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
return scaleUpStatefulSetControllerBounded(logger, set, ssc, spc, om, -1)
}
func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager, maxIterations int) error {
func scaleUpStatefulSetControllerBounded(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager, maxIterations int) error {
om.setsIndexer.Add(set)
ssc.enqueueStatefulSet(set)
fakeWorker(ssc)
@ -964,7 +991,7 @@ func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSet
return err
}
pod := getPodAtOrdinal(pods, ord)
ssc.addPod(pod)
ssc.addPod(logger, pod)
fakeWorker(ssc)
pod = getPodAtOrdinal(pods, ord)
prev := *pod
@ -972,7 +999,7 @@ func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSet
return err
}
pod = getPodAtOrdinal(pods, ord)
ssc.updatePod(&prev, pod)
ssc.updatePod(logger, &prev, pod)
fakeWorker(ssc)
pod = getPodAtOrdinal(pods, ord)
prev = *pod
@ -980,7 +1007,7 @@ func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSet
return err
}
pod = getPodAtOrdinal(pods, ord)
ssc.updatePod(&prev, pod)
ssc.updatePod(logger, &prev, pod)
fakeWorker(ssc)
if err := assertMonotonicInvariants(set, om); err != nil {
return err
@ -995,7 +1022,7 @@ func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSet
return assertMonotonicInvariants(set, om)
}
func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
func scaleDownStatefulSetController(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return err
@ -1016,10 +1043,10 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
return err
}
pod = getPodAtOrdinal(pods, ord)
ssc.updatePod(&prev, pod)
ssc.updatePod(logger, &prev, pod)
fakeWorker(ssc)
spc.DeleteStatefulPod(set, pod)
ssc.deletePod(pod)
ssc.deletePod(logger, pod)
fakeWorker(ssc)
for set.Status.Replicas > *set.Spec.Replicas {
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
@ -1033,10 +1060,10 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
return err
}
pod = getPodAtOrdinal(pods, ord)
ssc.updatePod(&prev, pod)
ssc.updatePod(logger, &prev, pod)
fakeWorker(ssc)
spc.DeleteStatefulPod(set, pod)
ssc.deletePod(pod)
ssc.deletePod(logger, pod)
fakeWorker(ssc)
obj, _, err := om.setsIndexer.Get(set)
if err != nil {

View File

@ -36,6 +36,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/statefulset"
@ -123,7 +124,8 @@ func TestVolumeTemplateNoopUpdate(t *testing.T) {
}
func TestSpecReplicasChange(t *testing.T) {
closeFn, rm, informers, c := scSetup(t)
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@ -166,7 +168,8 @@ func TestSpecReplicasChange(t *testing.T) {
}
func TestDeletingAndFailedPods(t *testing.T) {
closeFn, rm, informers, c := scSetup(t)
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@ -257,7 +260,8 @@ func TestStatefulSetAvailable(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
closeFn, rm, informers, c := scSetup(t)
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-available-pods", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@ -359,8 +363,11 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) {
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ssc := statefulset.NewStatefulSetController(
ctx,
informers.Core().V1().Pods(),
informers.Apps().V1().StatefulSets(),
informers.Core().V1().PersistentVolumeClaims(),
@ -371,8 +378,6 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) {
ns := framework.CreateNamespaceOrDie(c, "test-pod-fail", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go ssc.Run(ctx, 5)
@ -443,7 +448,8 @@ func TestAutodeleteOwnerRefs(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
closeFn, rm, informers, c := scSetup(t)
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(c, "test-autodelete-ownerrefs", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
@ -524,7 +530,8 @@ func TestStatefulSetStartOrdinal(t *testing.T) {
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)()
closeFn, rm, informers, c := scSetup(t)
_, ctx := ktesting.NewTestContext(t)
closeFn, rm, informers, c := scSetup(ctx, t)
defer closeFn()
cancel := runControllerAndInformers(rm, informers)
defer cancel()

View File

@ -161,7 +161,7 @@ func newStatefulSetPVC(name string) v1.PersistentVolumeClaim {
}
// scSetup sets up necessities for Statefulset integration test, including control plane, apiserver, informers, and clientset
func scSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
func scSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
@ -174,6 +174,7 @@ func scSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.Stat
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
sc := statefulset.NewStatefulSetController(
ctx,
informers.Core().V1().Pods(),
informers.Apps().V1().StatefulSets(),
informers.Core().V1().PersistentVolumeClaims(),