diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go index 7bc73b31651..085dc7c9596 100644 --- a/cmd/kube-controller-manager/app/apps.go +++ b/cmd/kube-controller-manager/app/apps.go @@ -50,7 +50,9 @@ func startDaemonSetController(ctx context.Context, controllerContext ControllerC } func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "statefulset")) go statefulset.NewStatefulSetController( + ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Apps().V1().StatefulSets(), controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(), diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index fa3f34d63bf..5842f6a9895 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -126,7 +126,7 @@ func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps. } if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { // Set PVC policy as much as is possible at this point. - if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } @@ -135,7 +135,7 @@ func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps. return err } -func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *StatefulPodControl) UpdateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { attemptedUpdate := false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // assume the Pod is consistent @@ -158,11 +158,11 @@ func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1. if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { // if the Pod's PVCs are not consistent with the StatefulSet's PVC deletion policy, update the PVC // and dirty the pod. - if match, err := spc.ClaimsMatchRetentionPolicy(set, pod); err != nil { + if match, err := spc.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } else if !match { - if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } @@ -207,7 +207,7 @@ func (spc *StatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1. // ClaimsMatchRetentionPolicy returns false if the PVCs for pod are not consistent with set's PVC deletion policy. // An error is returned if something is not consistent. This is expected if the pod is being otherwise updated, // but a problem otherwise (see usage of this method in UpdateStatefulPod). -func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) (bool, error) { +func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) (bool, error) { ordinal := getOrdinal(pod) templates := set.Spec.VolumeClaimTemplates for i := range templates { @@ -215,7 +215,7 @@ func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet, claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName) switch { case apierrors.IsNotFound(err): - klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration", claimName) + klog.FromContext(ctx).V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim)) case err != nil: return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name) default: @@ -228,7 +228,8 @@ func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet, } // UpdatePodClaimForRetentionPolicy updates the PVCs used by pod to match the PVC deletion policy of set. -func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { + logger := klog.FromContext(ctx) ordinal := getOrdinal(pod) templates := set.Spec.VolumeClaimTemplates for i := range templates { @@ -236,7 +237,7 @@ func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(set *apps.Statef claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName) switch { case apierrors.IsNotFound(err): - klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration.") + logger.V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim)) case err != nil: return fmt.Errorf("Could not retrieve claim %s not found for %s when checking PVC deletion policy: %w", claimName, pod.Name, err) default: @@ -316,14 +317,14 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulS } // createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy -func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { +func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { if err := spc.createPersistentVolumeClaims(set, pod); err != nil { return err } if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { // Set PVC policy as much as is possible at this point. - if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } diff --git a/pkg/controller/statefulset/stateful_pod_control_test.go b/pkg/controller/statefulset/stateful_pod_control_test.go index 2eaf58bcffa..1cc9c2782c4 100644 --- a/pkg/controller/statefulset/stateful_pod_control_test.go +++ b/pkg/controller/statefulset/stateful_pod_control_test.go @@ -25,7 +25,7 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" _ "k8s.io/kubernetes/pkg/apis/apps/install" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/features" @@ -241,6 +242,7 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) { } func TestStatefulPodControlNoOpUpdate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -257,7 +259,7 @@ func TestStatefulPodControlNoOpUpdate(t *testing.T) { t.Error("no-op update should not make any client invocation") return true, nil, apierrors.NewInternalError(errors.New("if we are here we have a problem")) }) - if err := control.UpdateStatefulPod(set, pod); err != nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err != nil { t.Errorf("Error returned on no-op update error: %s", err) } events := collectEvents(recorder.Events) @@ -267,6 +269,7 @@ func TestStatefulPodControlNoOpUpdate(t *testing.T) { } func TestStatefulPodControlUpdatesIdentity(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -281,7 +284,7 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) { return true, update.GetObject(), nil }) pod.Name = "goo-0" - if err := control.UpdateStatefulPod(set, pod); err != nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err != nil { t.Errorf("Successful update returned an error: %s", err) } events := collectEvents(recorder.Events) @@ -296,6 +299,7 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) { } func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -313,7 +317,7 @@ func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) pod.Name = "goo-0" - if err := control.UpdateStatefulPod(set, pod); err == nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err == nil { t.Error("Failed update does not generate an error") } events := collectEvents(recorder.Events) @@ -328,6 +332,7 @@ func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) { } func TestStatefulPodControlUpdatesPodStorage(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -357,7 +362,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) { updated = update.GetObject().(*v1.Pod) return true, update.GetObject(), nil }) - if err := control.UpdateStatefulPod(set, pod); err != nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err != nil { t.Errorf("Successful update returned an error: %s", err) } events := collectEvents(recorder.Events) @@ -375,6 +380,7 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) { } func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -397,7 +403,7 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) { fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) { return true, nil, apierrors.NewInternalError(errors.New("API server down")) }) - if err := control.UpdateStatefulPod(set, pod); err == nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err == nil { t.Error("Failed Pod storage update did not return an error") } events := collectEvents(recorder.Events) @@ -412,6 +418,7 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) { } func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) recorder := record.NewFakeRecorder(10) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) @@ -440,7 +447,7 @@ func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) { }) pod.Labels[apps.StatefulSetPodNameLabel] = "goo-0" - if err := control.UpdateStatefulPod(set, pod); err != nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err != nil { t.Errorf("Successful update returned an error: %s", err) } events := collectEvents(recorder.Events) @@ -497,6 +504,7 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) { func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) { // The claimOwnerMatchesSetAndPod is tested exhaustively in stateful_set_utils_test; this // test is for the wiring to the method tested there. + _, ctx := ktesting.NewTestContext(t) fakeClient := &fake.Clientset{} indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) claimLister := corelisters.NewPersistentVolumeClaimLister(indexer) @@ -512,7 +520,7 @@ func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) { WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType, WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType, } - if matches, err := control.ClaimsMatchRetentionPolicy(set, pod); err != nil { + if matches, err := control.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil { t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (retain): %v", err) } else if !matches { t.Error("Unexpected non-match for ClaimsMatchRetentionPolicy (retain)") @@ -521,7 +529,7 @@ func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) { WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType, WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType, } - if matches, err := control.ClaimsMatchRetentionPolicy(set, pod); err != nil { + if matches, err := control.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil { t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (set deletion): %v", err) } else if matches { t.Error("Unexpected match for ClaimsMatchRetentionPolicy (set deletion)") @@ -532,6 +540,7 @@ func TestStatefulPodControlUpdatePodClaimForRetentionPolicy(t *testing.T) { // All the update conditions are tested exhaustively in stateful_set_utils_test. This // tests the wiring from the pod control to that method. testFn := func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)() fakeClient := &fake.Clientset{} indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) @@ -554,7 +563,7 @@ func TestStatefulPodControlUpdatePodClaimForRetentionPolicy(t *testing.T) { WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType, WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType, } - if err := control.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + if err := control.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil { t.Errorf("Unexpected error for UpdatePodClaimForRetentionPolicy (retain): %v", err) } expectRef := utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) @@ -681,6 +690,7 @@ func TestPodClaimIsStale(t *testing.T) { func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) { testFn := func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) recorder := record.NewFakeRecorder(10) set := newStatefulSet(1) set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{ @@ -704,7 +714,7 @@ func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) { claimIndexer.Add(&claim) } control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder) - if err := control.UpdateStatefulPod(set, pod); err != nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err != nil { t.Errorf("Successful update returned an error: %s", err) } for k := range claims { @@ -735,6 +745,7 @@ func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) { } func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // Only applicable when the feature gate is on; the off case is tested in TestStatefulPodControlRetainRetentionPolicyUpdate. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)() @@ -766,7 +777,7 @@ func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) { podLister := corelisters.NewPodLister(podIndexer) claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer) control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder) - if err := control.UpdateStatefulPod(set, pod); err != nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err != nil { t.Errorf("Successful update returned an error: %s", err) } updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name) @@ -783,6 +794,7 @@ func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) { } func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) // Only applicable when the feature gate is on. defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)() @@ -805,7 +817,7 @@ func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) { return true, update.GetObject(), nil }) control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder) - if err := control.UpdateStatefulPod(set, pod); err != nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err != nil { t.Error("Unexpected error on pod update when PVCs are missing") } claims := getPersistentVolumeClaims(set, pod) @@ -818,7 +830,7 @@ func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) { claimIndexer.Add(&claim) } - if err := control.UpdateStatefulPod(set, pod); err != nil { + if err := control.UpdateStatefulPod(ctx, set, pod); err != nil { t.Errorf("Expected update to succeed, saw error %v", err) } updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name) diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index bc75933b1ee..6bc307d13e7 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -78,12 +78,14 @@ type StatefulSetController struct { // NewStatefulSetController creates a new statefulset controller. func NewStatefulSetController( + ctx context.Context, podInformer coreinformers.PodInformer, setInformer appsinformers.StatefulSetInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, revInformer appsinformers.ControllerRevisionInformer, kubeClient clientset.Interface, ) *StatefulSetController { + logger := klog.FromContext(ctx) eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"}) ssc := &StatefulSetController{ @@ -108,11 +110,17 @@ func NewStatefulSetController( podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ // lookup the statefulset and enqueue - AddFunc: ssc.addPod, + AddFunc: func(obj interface{}) { + ssc.addPod(logger, obj) + }, // lookup current and old statefulset if labels changed - UpdateFunc: ssc.updatePod, + UpdateFunc: func(oldObj, newObj interface{}) { + ssc.updatePod(logger, oldObj, newObj) + }, // lookup statefulset accounting for deletion tombstones - DeleteFunc: ssc.deletePod, + DeleteFunc: func(obj interface{}) { + ssc.deletePod(logger, obj) + }, }) ssc.podLister = podInformer.Lister() ssc.podListerSynced = podInformer.Informer().HasSynced @@ -124,7 +132,7 @@ func NewStatefulSetController( oldPS := old.(*apps.StatefulSet) curPS := cur.(*apps.StatefulSet) if oldPS.Status.Replicas != curPS.Status.Replicas { - klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas) + logger.V(4).Info("Observed updated replica count for StatefulSet", "statefulSet", klog.KObj(curPS), "oldReplicas", oldPS.Status.Replicas, "newReplicas", curPS.Status.Replicas) } ssc.enqueueStatefulSet(cur) }, @@ -149,8 +157,9 @@ func (ssc *StatefulSetController) Run(ctx context.Context, workers int) { defer ssc.queue.ShutDown() - klog.Infof("Starting stateful set controller") - defer klog.Infof("Shutting down statefulset controller") + logger := klog.FromContext(ctx) + logger.Info("Starting stateful set controller") + defer logger.Info("Shutting down statefulset controller") if !cache.WaitForNamedCacheSync("stateful set", ctx.Done(), ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) { return @@ -164,13 +173,13 @@ func (ssc *StatefulSetController) Run(ctx context.Context, workers int) { } // addPod adds the statefulset for the pod to the sync queue -func (ssc *StatefulSetController) addPod(obj interface{}) { +func (ssc *StatefulSetController) addPod(logger klog.Logger, obj interface{}) { pod := obj.(*v1.Pod) if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. - ssc.deletePod(pod) + ssc.deletePod(logger, pod) return } @@ -180,7 +189,7 @@ func (ssc *StatefulSetController) addPod(obj interface{}) { if set == nil { return } - klog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) + logger.V(4).Info("Pod created with labels", "pod", klog.KObj(pod), "labels", pod.Labels) ssc.enqueueStatefulSet(set) return } @@ -191,14 +200,14 @@ func (ssc *StatefulSetController) addPod(obj interface{}) { if len(sets) == 0 { return } - klog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels) + logger.V(4).Info("Orphan Pod created with labels", "pod", klog.KObj(pod), "labels", pod.Labels) for _, set := range sets { ssc.enqueueStatefulSet(set) } } // updatePod adds the statefulset for the current and old pods to the sync queue. -func (ssc *StatefulSetController) updatePod(old, cur interface{}) { +func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) if curPod.ResourceVersion == oldPod.ResourceVersion { @@ -225,13 +234,13 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { if set == nil { return } - klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + logger.V(4).Info("Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta) ssc.enqueueStatefulSet(set) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // the Pod status which in turn will trigger a requeue of the owning replica set thus // having its status updated with the newly available replica. if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 { - klog.V(2).Infof("StatefulSet %s will be enqueued after %ds for availability check", set.Name, set.Spec.MinReadySeconds) + logger.V(2).Info("StatefulSet will be enqueued after minReadySeconds for availability check", "statefulSet", klog.KObj(set), "minReadySeconds", set.Spec.MinReadySeconds) // Add a second to avoid milliseconds skew in AddAfter. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second) @@ -246,7 +255,7 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { if len(sets) == 0 { return } - klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + logger.V(4).Info("Orphan Pod objectMeta updated", "pod", klog.KObj(curPod), "oldObjectMeta", oldPod.ObjectMeta, "newObjectMeta", curPod.ObjectMeta) for _, set := range sets { ssc.enqueueStatefulSet(set) } @@ -254,7 +263,7 @@ func (ssc *StatefulSetController) updatePod(old, cur interface{}) { } // deletePod enqueues the statefulset for the pod accounting for deletion tombstones. -func (ssc *StatefulSetController) deletePod(obj interface{}) { +func (ssc *StatefulSetController) deletePod(logger klog.Logger, obj interface{}) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -282,7 +291,7 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) { if set == nil { return } - klog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) + logger.V(4).Info("Pod deleted.", "pod", klog.KObj(pod), "caller", utilruntime.GetCaller()) ssc.enqueueStatefulSet(set) } @@ -435,8 +444,9 @@ func (ssc *StatefulSetController) worker(ctx context.Context) { // sync syncs the given statefulset. func (ssc *StatefulSetController) sync(ctx context.Context, key string) error { startTime := time.Now() + logger := klog.FromContext(ctx) defer func() { - klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime)) + logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -445,7 +455,7 @@ func (ssc *StatefulSetController) sync(ctx context.Context, key string) error { } set, err := ssc.setLister.StatefulSets(namespace).Get(name) if errors.IsNotFound(err) { - klog.Infof("StatefulSet has been deleted %v", key) + logger.Info("StatefulSet has been deleted", "key", key) return nil } if err != nil { @@ -474,14 +484,15 @@ func (ssc *StatefulSetController) sync(ctx context.Context, key string) error { // syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod). func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) error { - klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods)) + logger := klog.FromContext(ctx) + logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods)) var status *apps.StatefulSetStatus var err error status, err = ssc.control.UpdateStatefulSet(ctx, set, pods) if err != nil { return err } - klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name) + logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set)) // One more sync to handle the clock skew. This is also helping in requeuing right after status update if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas { ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second) diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 276e41ef09f..c9deaba937d 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -97,6 +97,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set func (ssc *defaultStatefulSetControl) performUpdate( ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) { var currentStatus *apps.StatefulSetStatus + logger := klog.FromContext(ctx) // get the current, and update revisions currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) if err != nil { @@ -112,7 +113,7 @@ func (ssc *defaultStatefulSetControl) performUpdate( // make sure to update the latest status even if there is an error with non-nil currentStatus statusErr := ssc.updateStatefulSetStatus(ctx, set, currentStatus) if statusErr == nil { - klog.V(4).InfoS("Updated status", "statefulSet", klog.KObj(set), + logger.V(4).Info("Updated status", "statefulSet", klog.KObj(set), "replicas", currentStatus.Replicas, "readyReplicas", currentStatus.ReadyReplicas, "currentReplicas", currentStatus.CurrentReplicas, @@ -129,7 +130,7 @@ func (ssc *defaultStatefulSetControl) performUpdate( return currentRevision, updateRevision, currentStatus, statusErr } - klog.V(4).InfoS("StatefulSet revisions", "statefulSet", klog.KObj(set), + logger.V(4).Info("StatefulSet revisions", "statefulSet", klog.KObj(set), "currentRevision", currentStatus.CurrentRevision, "updateRevision", currentStatus.UpdateRevision) @@ -284,6 +285,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( updateRevision *apps.ControllerRevision, collisionCount int32, pods []*v1.Pod) (*apps.StatefulSetStatus, error) { + logger := klog.FromContext(ctx) // get the current and update revisions of the set. currentSet, err := ApplyRevision(set, currentRevision) if err != nil { @@ -380,7 +382,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } if unhealthy > 0 { - klog.V(4).InfoS("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod)) + logger.V(4).Info("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod)) } // If the StatefulSet is being deleted, don't do anything other than updating @@ -453,14 +455,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( set.Namespace, set.Name, replicas[i].Name) - if err := ssc.podControl.createMissingPersistentVolumeClaims(set, replicas[i]); err != nil { + if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { return &status, err } } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if isTerminating(replicas[i]) && monotonic { - klog.V(4).InfoS("StatefulSet is waiting for Pod to Terminate", + logger.V(4).Info("StatefulSet is waiting for Pod to Terminate", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) return &status, nil } @@ -468,7 +470,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Running and Ready. if !isRunningAndReady(replicas[i]) && monotonic { - klog.V(4).InfoS("StatefulSet is waiting for Pod to be Running and Ready", + logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) return &status, nil } @@ -476,7 +478,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Available. if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic { - klog.V(4).InfoS("StatefulSet is waiting for Pod to be Available", + logger.V(4).Info("StatefulSet is waiting for Pod to be Available", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) return &status, nil } @@ -484,7 +486,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( retentionMatch := true if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { var err error - retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, replicas[i]) + retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i]) // An error is expected if the pod is not yet fully updated, and so return is treated as matching. if err != nil { retentionMatch = true @@ -495,7 +497,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // Make a deep copy so we don't mutate the shared cache replica := replicas[i].DeepCopy() - if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil { + if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil { return &status, err } } @@ -503,10 +505,10 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { // Ensure ownerRefs are set correctly for the condemned pods. for i := range condemned { - if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(updateSet, condemned[i]); err != nil { + if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { return &status, err } else if !matchPolicy { - if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(updateSet, condemned[i]); err != nil { + if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { return &status, err } } @@ -521,7 +523,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( for target := len(condemned) - 1; target >= 0; target-- { // wait for terminating pods to expire if isTerminating(condemned[target]) { - klog.V(4).InfoS("StatefulSet is waiting for Pod to Terminate prior to scale down", + logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target])) // block if we are in monotonic mode if monotonic { @@ -531,17 +533,17 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod { - klog.V(4).InfoS("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", + logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) return &status, nil } // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. if !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod { - klog.V(4).InfoS("StatefulSet is waiting for Pod to be Available prior to scale down", + logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) return &status, nil } - klog.V(2).InfoS("Pod of StatefulSet is terminating for scale down", + logger.V(2).Info("Pod of StatefulSet is terminating for scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target])) if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil { @@ -583,7 +585,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // delete the Pod if it is not already terminating and does not match the update revision. if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) { - klog.V(2).InfoS("Pod of StatefulSet is terminating for update", + logger.V(2).Info("Pod of StatefulSet is terminating for update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target])) if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil { if !errors.IsNotFound(err) { @@ -596,7 +598,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // wait for unhealthy Pods on update if !isHealthy(replicas[target]) { - klog.V(4).InfoS("StatefulSet is waiting for Pod to update", + logger.V(4).Info("StatefulSet is waiting for Pod to update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target])) return &status, nil } @@ -614,6 +616,7 @@ func updateStatefulSetAfterInvariantEstablished( status apps.StatefulSetStatus, ) (*apps.StatefulSetStatus, error) { + logger := klog.FromContext(ctx) replicaCount := int(*set.Spec.Replicas) // we compute the minimum ordinal of the target sequence for a destructive update based on the strategy. @@ -644,11 +647,10 @@ func updateStatefulSetAfterInvariantEstablished( } if unavailablePods >= maxUnavailable { - klog.V(2).Infof("StatefulSet %s/%s found unavailablePods %v, more than or equal to allowed maxUnavailable %v", - set.Namespace, - set.Name, - unavailablePods, - maxUnavailable) + logger.V(2).Info("StatefulSet found unavailablePods, more than or equal to allowed maxUnavailable", + "statefulSet", klog.KObj(set), + "unavailablePods", unavailablePods, + "maxUnavailable", maxUnavailable) return &status, nil } @@ -662,10 +664,9 @@ func updateStatefulSetAfterInvariantEstablished( // delete the Pod if it is healthy and the revision doesnt match the target if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) { // delete the Pod if it is healthy and the revision doesnt match the target - klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update", - set.Namespace, - set.Name, - replicas[target].Name) + logger.V(2).Info("StatefulSet terminating Pod for update", + "statefulSet", klog.KObj(set), + "pod", klog.KObj(replicas[target])) if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil { if !errors.IsNotFound(err) { return &status, err diff --git a/pkg/controller/statefulset/stateful_set_test.go b/pkg/controller/statefulset/stateful_set_test.go index 1f2e98614a0..a3934a3194b 100644 --- a/pkg/controller/statefulset/stateful_set_test.go +++ b/pkg/controller/statefulset/stateful_set_test.go @@ -39,6 +39,8 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/features" @@ -50,8 +52,9 @@ func alwaysReady() bool { return true } func TestStatefulSetControllerCreates(t *testing.T) { set := newStatefulSet(3) - ssc, spc, om, _ := newFakeStatefulSetController(set) - if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil { + logger, ctx := ktesting.NewTestContext(t) + ssc, spc, om, _ := newFakeStatefulSetController(ctx, set) + if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } if obj, _, err := om.setsIndexer.Get(set); err != nil { @@ -66,8 +69,9 @@ func TestStatefulSetControllerCreates(t *testing.T) { func TestStatefulSetControllerDeletes(t *testing.T) { set := newStatefulSet(3) - ssc, spc, om, _ := newFakeStatefulSetController(set) - if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil { + logger, ctx := ktesting.NewTestContext(t) + ssc, spc, om, _ := newFakeStatefulSetController(ctx, set) + if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } if obj, _, err := om.setsIndexer.Get(set); err != nil { @@ -79,7 +83,7 @@ func TestStatefulSetControllerDeletes(t *testing.T) { t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas) } *set.Spec.Replicas = 0 - if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil { + if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf("Failed to turn down StatefulSet : %s", err) } if obj, _, err := om.setsIndexer.Get(set); err != nil { @@ -94,8 +98,9 @@ func TestStatefulSetControllerDeletes(t *testing.T) { func TestStatefulSetControllerRespectsTermination(t *testing.T) { set := newStatefulSet(3) - ssc, spc, om, _ := newFakeStatefulSetController(set) - if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil { + logger, ctx := ktesting.NewTestContext(t) + ssc, spc, om, _ := newFakeStatefulSetController(ctx, set) + if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } if obj, _, err := om.setsIndexer.Get(set); err != nil { @@ -114,7 +119,7 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) { if err != nil { t.Error(err) } - ssc.syncStatefulSet(context.TODO(), set, pods) + ssc.syncStatefulSet(ctx, set, pods) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { t.Error(err) @@ -130,7 +135,7 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) { spc.DeleteStatefulPod(set, pods[3]) spc.DeleteStatefulPod(set, pods[4]) *set.Spec.Replicas = 0 - if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil { + if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf("Failed to turn down StatefulSet : %s", err) } if obj, _, err := om.setsIndexer.Get(set); err != nil { @@ -144,9 +149,10 @@ func TestStatefulSetControllerRespectsTermination(t *testing.T) { } func TestStatefulSetControllerBlocksScaling(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) set := newStatefulSet(3) - ssc, spc, om, _ := newFakeStatefulSetController(set) - if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil { + ssc, spc, om, _ := newFakeStatefulSetController(ctx, set) + if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf("Failed to turn up StatefulSet : %s", err) } if obj, _, err := om.setsIndexer.Get(set); err != nil { @@ -191,9 +197,10 @@ func TestStatefulSetControllerBlocksScaling(t *testing.T) { } func TestStatefulSetControllerDeletionTimestamp(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) set := newStatefulSet(3) set.DeletionTimestamp = new(metav1.Time) - ssc, _, om, _ := newFakeStatefulSetController(set) + ssc, _, om, _ := newFakeStatefulSetController(ctx, set) om.setsIndexer.Add(set) @@ -215,10 +222,11 @@ func TestStatefulSetControllerDeletionTimestamp(t *testing.T) { } func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) set := newStatefulSet(3) // The bare client says it IS deleted. set.DeletionTimestamp = new(metav1.Time) - ssc, _, om, ssh := newFakeStatefulSetController(set) + ssc, _, om, ssh := newFakeStatefulSetController(ctx, set) // The lister (cache) says it's NOT deleted. set2 := *set @@ -279,7 +287,8 @@ func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) { } func TestStatefulSetControllerAddPod(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set1 := newStatefulSet(3) set2 := newStatefulSet(3) pod1 := newStatefulSetPod(set1, 0) @@ -287,7 +296,7 @@ func TestStatefulSetControllerAddPod(t *testing.T) { om.setsIndexer.Add(set1) om.setsIndexer.Add(set2) - ssc.addPod(pod1) + ssc.addPod(logger, pod1) key, done := ssc.queue.Get() if key == nil || done { t.Error("failed to enqueue StatefulSet") @@ -298,7 +307,7 @@ func TestStatefulSetControllerAddPod(t *testing.T) { } ssc.queue.Done(key) - ssc.addPod(pod2) + ssc.addPod(logger, pod2) key, done = ssc.queue.Get() if key == nil || done { t.Error("failed to enqueue StatefulSet") @@ -311,7 +320,8 @@ func TestStatefulSetControllerAddPod(t *testing.T) { } func TestStatefulSetControllerAddPodOrphan(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set1 := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" @@ -325,17 +335,18 @@ func TestStatefulSetControllerAddPodOrphan(t *testing.T) { // Make pod an orphan. Expect matching sets to be queued. pod.OwnerReferences = nil - ssc.addPod(pod) + ssc.addPod(logger, pod) if got, want := ssc.queue.Len(), 2; got != want { t.Errorf("queue.Len() = %v, want %v", got, want) } } func TestStatefulSetControllerAddPodNoSet(t *testing.T) { - ssc, _, _, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, _, _ := newFakeStatefulSetController(ctx) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) - ssc.addPod(pod) + ssc.addPod(logger, pod) ssc.queue.ShutDown() key, _ := ssc.queue.Get() if key != nil { @@ -344,7 +355,8 @@ func TestStatefulSetControllerAddPodNoSet(t *testing.T) { } func TestStatefulSetControllerUpdatePod(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set1 := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" @@ -355,7 +367,7 @@ func TestStatefulSetControllerUpdatePod(t *testing.T) { prev := *pod1 fakeResourceVersion(pod1) - ssc.updatePod(&prev, pod1) + ssc.updatePod(logger, &prev, pod1) key, done := ssc.queue.Get() if key == nil || done { t.Error("failed to enqueue StatefulSet") @@ -367,7 +379,7 @@ func TestStatefulSetControllerUpdatePod(t *testing.T) { prev = *pod2 fakeResourceVersion(pod2) - ssc.updatePod(&prev, pod2) + ssc.updatePod(logger, &prev, pod2) key, done = ssc.queue.Get() if key == nil || done { t.Error("failed to enqueue StatefulSet") @@ -379,12 +391,13 @@ func TestStatefulSetControllerUpdatePod(t *testing.T) { } func TestStatefulSetControllerUpdatePodWithNoSet(t *testing.T) { - ssc, _, _, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, _, _ := newFakeStatefulSetController(ctx) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) prev := *pod fakeResourceVersion(pod) - ssc.updatePod(&prev, pod) + ssc.updatePod(logger, &prev, pod) ssc.queue.ShutDown() key, _ := ssc.queue.Get() if key != nil { @@ -393,11 +406,12 @@ func TestStatefulSetControllerUpdatePodWithNoSet(t *testing.T) { } func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) om.setsIndexer.Add(set) - ssc.updatePod(pod, pod) + ssc.updatePod(logger, pod, pod) ssc.queue.ShutDown() key, _ := ssc.queue.Get() if key != nil { @@ -406,7 +420,8 @@ func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) { } func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) pod.OwnerReferences = nil @@ -417,14 +432,15 @@ func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) { clone := *pod clone.Labels = map[string]string{"foo2": "bar2"} fakeResourceVersion(&clone) - ssc.updatePod(&clone, pod) + ssc.updatePod(logger, &clone, pod) if got, want := ssc.queue.Len(), 2; got != want { t.Errorf("queue.Len() = %v, want %v", got, want) } } func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" @@ -435,14 +451,15 @@ func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) { clone := *pod clone.OwnerReferences = pod2.OwnerReferences fakeResourceVersion(&clone) - ssc.updatePod(&clone, pod) + ssc.updatePod(logger, &clone, pod) if got, want := ssc.queue.Len(), 2; got != want { t.Errorf("queue.Len() = %v, want %v", got, want) } } func TestStatefulSetControllerUpdatePodRelease(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" @@ -452,14 +469,15 @@ func TestStatefulSetControllerUpdatePodRelease(t *testing.T) { clone := *pod clone.OwnerReferences = nil fakeResourceVersion(&clone) - ssc.updatePod(pod, &clone) + ssc.updatePod(logger, pod, &clone) if got, want := ssc.queue.Len(), 2; got != want { t.Errorf("queue.Len() = %v, want %v", got, want) } } func TestStatefulSetControllerDeletePod(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set1 := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" @@ -468,7 +486,7 @@ func TestStatefulSetControllerDeletePod(t *testing.T) { om.setsIndexer.Add(set1) om.setsIndexer.Add(set2) - ssc.deletePod(pod1) + ssc.deletePod(logger, pod1) key, done := ssc.queue.Get() if key == nil || done { t.Error("failed to enqueue StatefulSet") @@ -478,7 +496,7 @@ func TestStatefulSetControllerDeletePod(t *testing.T) { t.Errorf("expected StatefulSet key %s found %s", expectedKey, key) } - ssc.deletePod(pod2) + ssc.deletePod(logger, pod2) key, done = ssc.queue.Get() if key == nil || done { t.Error("failed to enqueue StatefulSet") @@ -490,7 +508,8 @@ func TestStatefulSetControllerDeletePod(t *testing.T) { } func TestStatefulSetControllerDeletePodOrphan(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set1 := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" @@ -499,20 +518,21 @@ func TestStatefulSetControllerDeletePodOrphan(t *testing.T) { om.setsIndexer.Add(set2) pod1.OwnerReferences = nil - ssc.deletePod(pod1) + ssc.deletePod(logger, pod1) if got, want := ssc.queue.Len(), 0; got != want { t.Errorf("queue.Len() = %v, want %v", got, want) } } func TestStatefulSetControllerDeletePodTombstone(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + logger, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set := newStatefulSet(3) pod := newStatefulSetPod(set, 0) om.setsIndexer.Add(set) tombstoneKey, _ := controller.KeyFunc(pod) tombstone := cache.DeletedFinalStateUnknown{Key: tombstoneKey, Obj: pod} - ssc.deletePod(tombstone) + ssc.deletePod(logger, tombstone) key, done := ssc.queue.Get() if key == nil || done { t.Error("failed to enqueue StatefulSet") @@ -524,7 +544,8 @@ func TestStatefulSetControllerDeletePodTombstone(t *testing.T) { } func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) { - ssc, _, om, _ := newFakeStatefulSetController() + _, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx) set1 := newStatefulSet(3) set2 := newStatefulSet(3) set2.Name = "foo2" @@ -553,7 +574,8 @@ func TestGetPodsForStatefulSetAdopt(t *testing.T) { pod4.OwnerReferences = nil pod4.Name = "x" + pod4.Name - ssc, _, om, _ := newFakeStatefulSetController(set, pod1, pod2, pod3, pod4) + _, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx, set, pod1, pod2, pod3, pod4) om.podsIndexer.Add(pod1) om.podsIndexer.Add(pod2) @@ -595,7 +617,8 @@ func TestAdoptOrphanRevisions(t *testing.T) { ss1Rev2.Namespace = ss1.Namespace ss1Rev2.OwnerReferences = []metav1.OwnerReference{} - ssc, _, om, _ := newFakeStatefulSetController(ss1, ss1Rev1, ss1Rev2) + _, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx, ss1, ss1Rev1, ss1Rev2) om.revisionsIndexer.Add(ss1Rev1) om.revisionsIndexer.Add(ss1Rev2) @@ -621,8 +644,9 @@ func TestAdoptOrphanRevisions(t *testing.T) { } func TestGetPodsForStatefulSetRelease(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) set := newStatefulSet(3) - ssc, _, om, _ := newFakeStatefulSetController(set) + ssc, _, om, _ := newFakeStatefulSetController(ctx, set) pod1 := newStatefulSetPod(set, 1) // pod2 is owned but has wrong name. pod2 := newStatefulSetPod(set, 2) @@ -667,7 +691,8 @@ func TestOrphanedPodsWithPVCDeletePolicy(t *testing.T) { *set.Spec.Replicas = 2 set.Spec.PersistentVolumeClaimRetentionPolicy.WhenScaled = scaledownPolicy set.Spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted = deletionPolicy - ssc, _, om, _ := newFakeStatefulSetController(set) + _, ctx := ktesting.NewTestContext(t) + ssc, _, om, _ := newFakeStatefulSetController(ctx, set) om.setsIndexer.Add(set) pods := []*v1.Pod{} @@ -811,8 +836,9 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) { } set := newStatefulSet(3) set.Spec.PersistentVolumeClaimRetentionPolicy = policy - ssc, spc, om, _ := newFakeStatefulSetController(set) - if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil { + logger, ctx := ktesting.NewTestContext(t) + ssc, spc, om, _ := newFakeStatefulSetController(ctx, set) + if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf(onPolicy("Failed to turn up StatefulSet : %s", err)) } var err error @@ -823,7 +849,7 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) { t.Errorf(onPolicy("set.Status.Replicas = %v; want 3", set.Status.Replicas)) } *set.Spec.Replicas = 2 - if err := scaleDownStatefulSetController(set, ssc, spc, om); err != nil { + if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf(onPolicy("Failed to scale down StatefulSet : msg, %s", err)) } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -857,7 +883,7 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) { *set.Spec.Replicas = 3 // Until the stale PVC goes away, the scale up should never finish. Run 10 iterations, then delete the PVC. - if err := scaleUpStatefulSetControllerBounded(set, ssc, spc, om, 10); err != nil { + if err := scaleUpStatefulSetControllerBounded(logger, set, ssc, spc, om, 10); err != nil { t.Errorf(onPolicy("Failed attempt to scale StatefulSet back up: %v", err)) } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -890,7 +916,7 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) { t.Errorf(onPolicy("Could not delete stale pvc: %v", err)) } - if err := scaleUpStatefulSetController(set, ssc, spc, om); err != nil { + if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil { t.Errorf(onPolicy("Failed to scale StatefulSet back up: %v", err)) } set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) @@ -903,13 +929,14 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) { } } -func newFakeStatefulSetController(initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) { +func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) { client := fake.NewSimpleClientset(initialObjects...) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) om := newFakeObjectManager(informerFactory) spc := NewStatefulPodControlFromManager(om, &noopRecorder{}) ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) ssc := NewStatefulSetController( + ctx, informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets(), informerFactory.Core().V1().PersistentVolumeClaims(), @@ -940,11 +967,11 @@ func getPodAtOrdinal(pods []*v1.Pod, ordinal int) *v1.Pod { return pods[ordinal] } -func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error { - return scaleUpStatefulSetControllerBounded(set, ssc, spc, om, -1) +func scaleUpStatefulSetController(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error { + return scaleUpStatefulSetControllerBounded(logger, set, ssc, spc, om, -1) } -func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager, maxIterations int) error { +func scaleUpStatefulSetControllerBounded(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager, maxIterations int) error { om.setsIndexer.Add(set) ssc.enqueueStatefulSet(set) fakeWorker(ssc) @@ -964,7 +991,7 @@ func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSet return err } pod := getPodAtOrdinal(pods, ord) - ssc.addPod(pod) + ssc.addPod(logger, pod) fakeWorker(ssc) pod = getPodAtOrdinal(pods, ord) prev := *pod @@ -972,7 +999,7 @@ func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSet return err } pod = getPodAtOrdinal(pods, ord) - ssc.updatePod(&prev, pod) + ssc.updatePod(logger, &prev, pod) fakeWorker(ssc) pod = getPodAtOrdinal(pods, ord) prev = *pod @@ -980,7 +1007,7 @@ func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSet return err } pod = getPodAtOrdinal(pods, ord) - ssc.updatePod(&prev, pod) + ssc.updatePod(logger, &prev, pod) fakeWorker(ssc) if err := assertMonotonicInvariants(set, om); err != nil { return err @@ -995,7 +1022,7 @@ func scaleUpStatefulSetControllerBounded(set *apps.StatefulSet, ssc *StatefulSet return assertMonotonicInvariants(set, om) } -func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error { +func scaleDownStatefulSetController(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error { selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { return err @@ -1016,10 +1043,10 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr return err } pod = getPodAtOrdinal(pods, ord) - ssc.updatePod(&prev, pod) + ssc.updatePod(logger, &prev, pod) fakeWorker(ssc) spc.DeleteStatefulPod(set, pod) - ssc.deletePod(pod) + ssc.deletePod(logger, pod) fakeWorker(ssc) for set.Status.Replicas > *set.Spec.Replicas { pods, err = om.podsLister.Pods(set.Namespace).List(selector) @@ -1033,10 +1060,10 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr return err } pod = getPodAtOrdinal(pods, ord) - ssc.updatePod(&prev, pod) + ssc.updatePod(logger, &prev, pod) fakeWorker(ssc) spc.DeleteStatefulPod(set, pod) - ssc.deletePod(pod) + ssc.deletePod(logger, pod) fakeWorker(ssc) obj, _, err := om.setsIndexer.Get(set) if err != nil { diff --git a/test/integration/statefulset/statefulset_test.go b/test/integration/statefulset/statefulset_test.go index f082bde28d8..5d0409327ea 100644 --- a/test/integration/statefulset/statefulset_test.go +++ b/test/integration/statefulset/statefulset_test.go @@ -36,6 +36,7 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/statefulset" @@ -123,7 +124,8 @@ func TestVolumeTemplateNoopUpdate(t *testing.T) { } func TestSpecReplicasChange(t *testing.T) { - closeFn, rm, informers, c := scSetup(t) + _, ctx := ktesting.NewTestContext(t) + closeFn, rm, informers, c := scSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t) defer framework.DeleteNamespaceOrDie(c, ns, t) @@ -166,7 +168,8 @@ func TestSpecReplicasChange(t *testing.T) { } func TestDeletingAndFailedPods(t *testing.T) { - closeFn, rm, informers, c := scSetup(t) + _, ctx := ktesting.NewTestContext(t) + closeFn, rm, informers, c := scSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t) defer framework.DeleteNamespaceOrDie(c, ns, t) @@ -257,7 +260,8 @@ func TestStatefulSetAvailable(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - closeFn, rm, informers, c := scSetup(t) + _, ctx := ktesting.NewTestContext(t) + closeFn, rm, informers, c := scSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(c, "test-available-pods", t) defer framework.DeleteNamespaceOrDie(c, ns, t) @@ -359,8 +363,11 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) { resyncPeriod := 12 * time.Hour informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod) - + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() ssc := statefulset.NewStatefulSetController( + ctx, informers.Core().V1().Pods(), informers.Apps().V1().StatefulSets(), informers.Core().V1().PersistentVolumeClaims(), @@ -371,8 +378,6 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) { ns := framework.CreateNamespaceOrDie(c, "test-pod-fail", t) defer framework.DeleteNamespaceOrDie(c, ns, t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() informers.Start(ctx.Done()) go ssc.Run(ctx, 5) @@ -443,7 +448,8 @@ func TestAutodeleteOwnerRefs(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)() - closeFn, rm, informers, c := scSetup(t) + _, ctx := ktesting.NewTestContext(t) + closeFn, rm, informers, c := scSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(c, "test-autodelete-ownerrefs", t) defer framework.DeleteNamespaceOrDie(c, ns, t) @@ -524,7 +530,8 @@ func TestStatefulSetStartOrdinal(t *testing.T) { } defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)() - closeFn, rm, informers, c := scSetup(t) + _, ctx := ktesting.NewTestContext(t) + closeFn, rm, informers, c := scSetup(ctx, t) defer closeFn() cancel := runControllerAndInformers(rm, informers) defer cancel() diff --git a/test/integration/statefulset/util.go b/test/integration/statefulset/util.go index 8ab42e8ca4d..896994b042c 100644 --- a/test/integration/statefulset/util.go +++ b/test/integration/statefulset/util.go @@ -161,7 +161,7 @@ func newStatefulSetPVC(name string) v1.PersistentVolumeClaim { } // scSetup sets up necessities for Statefulset integration test, including control plane, apiserver, informers, and clientset -func scSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) { +func scSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) @@ -174,6 +174,7 @@ func scSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.Stat informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod) sc := statefulset.NewStatefulSetController( + ctx, informers.Core().V1().Pods(), informers.Apps().V1().StatefulSets(), informers.Core().V1().PersistentVolumeClaims(),