diff --git a/cmd/kube-controller-manager/app/apps.go b/cmd/kube-controller-manager/app/apps.go index 085dc7c9596..ced905a6ad4 100644 --- a/cmd/kube-controller-manager/app/apps.go +++ b/cmd/kube-controller-manager/app/apps.go @@ -74,7 +74,9 @@ func startReplicaSetController(ctx context.Context, controllerContext Controller } func startDeploymentController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "deployment")) dc, err := deployment.NewDeploymentController( + ctx, controllerContext.InformerFactory.Apps().V1().Deployments(), controllerContext.InformerFactory.Apps().V1().ReplicaSets(), controllerContext.InformerFactory.Core().V1().Pods(), diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 4a2aa5adbe0..c08dd0c1908 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -26,8 +26,6 @@ import ( "reflect" "time" - "k8s.io/klog/v2" - apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -46,6 +44,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/deployment/util" ) @@ -99,9 +98,9 @@ type DeploymentController struct { } // NewDeploymentController creates a new DeploymentController. -func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { +func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { eventBroadcaster := record.NewBroadcaster() - + logger := klog.FromContext(ctx) dc := &DeploymentController{ client: client, eventBroadcaster: eventBroadcaster, @@ -114,18 +113,32 @@ func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInfor } dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addDeployment, - UpdateFunc: dc.updateDeployment, + AddFunc: func(obj interface{}) { + dc.addDeployment(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + dc.updateDeployment(logger, oldObj, newObj) + }, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. - DeleteFunc: dc.deleteDeployment, + DeleteFunc: func(obj interface{}) { + dc.deleteDeployment(logger, obj) + }, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addReplicaSet, - UpdateFunc: dc.updateReplicaSet, - DeleteFunc: dc.deleteReplicaSet, + AddFunc: func(obj interface{}) { + dc.addReplicaSet(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + dc.updateReplicaSet(logger, oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + dc.deleteReplicaSet(logger, obj) + }, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - DeleteFunc: dc.deletePod, + DeleteFunc: func(obj interface{}) { + dc.deletePod(logger, obj) + }, }) dc.syncHandler = dc.syncDeployment @@ -151,8 +164,9 @@ func (dc *DeploymentController) Run(ctx context.Context, workers int) { defer dc.queue.ShutDown() - klog.InfoS("Starting controller", "controller", "deployment") - defer klog.InfoS("Shutting down controller", "controller", "deployment") + logger := klog.FromContext(ctx) + logger.Info("Starting controller", "controller", "deployment") + defer logger.Info("Shutting down controller", "controller", "deployment") if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return @@ -165,20 +179,20 @@ func (dc *DeploymentController) Run(ctx context.Context, workers int) { <-ctx.Done() } -func (dc *DeploymentController) addDeployment(obj interface{}) { +func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) { d := obj.(*apps.Deployment) - klog.V(4).InfoS("Adding deployment", "deployment", klog.KObj(d)) + logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d)) dc.enqueueDeployment(d) } -func (dc *DeploymentController) updateDeployment(old, cur interface{}) { +func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) { oldD := old.(*apps.Deployment) curD := cur.(*apps.Deployment) - klog.V(4).InfoS("Updating deployment", "deployment", klog.KObj(oldD)) + logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD)) dc.enqueueDeployment(curD) } -func (dc *DeploymentController) deleteDeployment(obj interface{}) { +func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) { d, ok := obj.(*apps.Deployment) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -192,39 +206,38 @@ func (dc *DeploymentController) deleteDeployment(obj interface{}) { return } } - klog.V(4).InfoS("Deleting deployment", "deployment", klog.KObj(d)) + logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d)) dc.enqueueDeployment(d) } // addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created. -func (dc *DeploymentController) addReplicaSet(obj interface{}) { +func (dc *DeploymentController) addReplicaSet(logger klog.Logger, obj interface{}) { rs := obj.(*apps.ReplicaSet) if rs.DeletionTimestamp != nil { // On a restart of the controller manager, it's possible for an object to // show up in a state that is already pending deletion. - dc.deleteReplicaSet(rs) + dc.deleteReplicaSet(logger, rs) return } - // If it has a ControllerRef, that's all that matters. if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil { d := dc.resolveControllerRef(rs.Namespace, controllerRef) if d == nil { return } - klog.V(4).InfoS("ReplicaSet added", "replicaSet", klog.KObj(rs)) + logger.V(4).Info("ReplicaSet added", "replicaSet", klog.KObj(rs)) dc.enqueueDeployment(d) return } // Otherwise, it's an orphan. Get a list of all matching Deployments and sync // them to see if anyone wants to adopt it. - ds := dc.getDeploymentsForReplicaSet(rs) + ds := dc.getDeploymentsForReplicaSet(logger, rs) if len(ds) == 0 { return } - klog.V(4).InfoS("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs)) + logger.V(4).Info("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs)) for _, d := range ds { dc.enqueueDeployment(d) } @@ -232,7 +245,7 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) { // getDeploymentsForReplicaSet returns a list of Deployments that potentially // match a ReplicaSet. -func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) []*apps.Deployment { +func (dc *DeploymentController) getDeploymentsForReplicaSet(logger klog.Logger, rs *apps.ReplicaSet) []*apps.Deployment { deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs) if err != nil || len(deployments) == 0 { return nil @@ -244,7 +257,7 @@ func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) if len(deployments) > 1 { // ControllerRef will ensure we don't do anything crazy, but more than one // item in this list nevertheless constitutes user error. - klog.V(4).InfoS("user error! more than one deployment is selecting replica set", + logger.V(4).Info("user error! more than one deployment is selecting replica set", "replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0])) } return deployments @@ -254,7 +267,7 @@ func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) // is updated and wake them up. If the anything of the ReplicaSets have changed, we need to // awaken both the old and new deployments. old and cur must be *apps.ReplicaSet // types. -func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) { +func (dc *DeploymentController) updateReplicaSet(logger klog.Logger, old, cur interface{}) { curRS := cur.(*apps.ReplicaSet) oldRS := old.(*apps.ReplicaSet) if curRS.ResourceVersion == oldRS.ResourceVersion { @@ -272,14 +285,13 @@ func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) { dc.enqueueDeployment(d) } } - // If it has a ControllerRef, that's all that matters. if curControllerRef != nil { d := dc.resolveControllerRef(curRS.Namespace, curControllerRef) if d == nil { return } - klog.V(4).InfoS("ReplicaSet updated", "replicaSet", klog.KObj(curRS)) + logger.V(4).Info("ReplicaSet updated", "replicaSet", klog.KObj(curRS)) dc.enqueueDeployment(d) return } @@ -288,11 +300,11 @@ func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) { // to see if anyone wants to adopt it now. labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels) if labelChanged || controllerRefChanged { - ds := dc.getDeploymentsForReplicaSet(curRS) + ds := dc.getDeploymentsForReplicaSet(logger, curRS) if len(ds) == 0 { return } - klog.V(4).InfoS("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS)) + logger.V(4).Info("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS)) for _, d := range ds { dc.enqueueDeployment(d) } @@ -302,7 +314,7 @@ func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) { // deleteReplicaSet enqueues the deployment that manages a ReplicaSet when // the ReplicaSet is deleted. obj could be an *apps.ReplicaSet, or // a DeletionFinalStateUnknown marker item. -func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { +func (dc *DeploymentController) deleteReplicaSet(logger klog.Logger, obj interface{}) { rs, ok := obj.(*apps.ReplicaSet) // When a delete is dropped, the relist will notice a pod in the store not @@ -331,12 +343,12 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { if d == nil { return } - klog.V(4).InfoS("ReplicaSet deleted", "replicaSet", klog.KObj(rs)) + logger.V(4).Info("ReplicaSet deleted", "replicaSet", klog.KObj(rs)) dc.enqueueDeployment(d) } // deletePod will enqueue a Recreate Deployment once all of its pods have stopped running. -func (dc *DeploymentController) deletePod(obj interface{}) { +func (dc *DeploymentController) deletePod(logger klog.Logger, obj interface{}) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -355,8 +367,8 @@ func (dc *DeploymentController) deletePod(obj interface{}) { return } } - klog.V(4).InfoS("Pod deleted", "pod", klog.KObj(pod)) - if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType { + logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod)) + if d := dc.getDeploymentForPod(logger, pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType { // Sync if this Deployment now has no more Pods. rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1())) if err != nil { @@ -408,7 +420,7 @@ func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after } // getDeploymentForPod returns the deployment managing the given Pod. -func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *apps.Deployment { +func (dc *DeploymentController) getDeploymentForPod(logger klog.Logger, pod *v1.Pod) *apps.Deployment { // Find the owning replica set var rs *apps.ReplicaSet var err error @@ -423,7 +435,7 @@ func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *apps.Deploymen } rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) if err != nil || rs.UID != controllerRef.UID { - klog.V(4).InfoS("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err) + logger.V(4).Info("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err) return nil } @@ -471,30 +483,30 @@ func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool { defer dc.queue.Done(key) err := dc.syncHandler(ctx, key.(string)) - dc.handleErr(err, key) + dc.handleErr(ctx, err, key) return true } -func (dc *DeploymentController) handleErr(err error, key interface{}) { +func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) { + logger := klog.FromContext(ctx) if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { dc.queue.Forget(key) return } - ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string)) if keyErr != nil { - klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key) + logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key) } if dc.queue.NumRequeues(key) < maxRetries { - klog.V(2).InfoS("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err) + logger.V(2).Info("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err) dc.queue.AddRateLimited(key) return } utilruntime.HandleError(err) - klog.V(2).InfoS("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err) + logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err) dc.queue.Forget(key) } @@ -567,6 +579,7 @@ func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsLis // syncDeployment will sync the deployment with the given key. // This function is not meant to be invoked concurrently with the same key. func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key) @@ -574,14 +587,14 @@ func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) } startTime := time.Now() - klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime) + logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime) defer func() { - klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime)) + logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime)) }() deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { - klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name)) + logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name)) return nil } if err != nil { diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 45b7a005ad9..05f36c26703 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -34,6 +34,8 @@ import ( "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" _ "k8s.io/kubernetes/pkg/apis/apps/install" _ "k8s.io/kubernetes/pkg/apis/authentication/install" _ "k8s.io/kubernetes/pkg/apis/authorization/install" @@ -181,10 +183,10 @@ func newFixture(t testing.TB) *fixture { return f } -func (f *fixture) newController() (*DeploymentController, informers.SharedInformerFactory, error) { +func (f *fixture) newController(ctx context.Context) (*DeploymentController, informers.SharedInformerFactory, error) { f.client = fake.NewSimpleClientset(f.objects...) informers := informers.NewSharedInformerFactory(f.client, controller.NoResyncPeriodFunc()) - c, err := NewDeploymentController(informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(), f.client) + c, err := NewDeploymentController(ctx, informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(), f.client) if err != nil { return nil, nil, err } @@ -204,16 +206,16 @@ func (f *fixture) newController() (*DeploymentController, informers.SharedInform return c, informers, nil } -func (f *fixture) runExpectError(deploymentName string, startInformers bool) { - f.run_(deploymentName, startInformers, true) +func (f *fixture) runExpectError(ctx context.Context, deploymentName string, startInformers bool) { + f.run_(ctx, deploymentName, startInformers, true) } -func (f *fixture) run(deploymentName string) { - f.run_(deploymentName, true, false) +func (f *fixture) run(ctx context.Context, deploymentName string) { + f.run_(ctx, deploymentName, true, false) } -func (f *fixture) run_(deploymentName string, startInformers bool, expectError bool) { - c, informers, err := f.newController() +func (f *fixture) run_(ctx context.Context, deploymentName string, startInformers bool, expectError bool) { + c, informers, err := f.newController(ctx) if err != nil { f.t.Fatalf("error creating Deployment controller: %v", err) } @@ -223,7 +225,7 @@ func (f *fixture) run_(deploymentName string, startInformers bool, expectError b informers.Start(stopCh) } - err = c.syncDeployment(context.TODO(), deploymentName) + err = c.syncDeployment(ctx, deploymentName) if !expectError && err != nil { f.t.Errorf("error syncing deployment: %v", err) } else if expectError && err == nil { @@ -268,6 +270,8 @@ func filterInformerActions(actions []core.Action) []core.Action { } func TestSyncDeploymentCreatesReplicaSet(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -280,10 +284,12 @@ func TestSyncDeploymentCreatesReplicaSet(t *testing.T) { f.expectUpdateDeploymentStatusAction(d) f.expectUpdateDeploymentStatusAction(d) - f.run(testutil.GetKey(d, t)) + f.run(ctx, testutil.GetKey(d, t)) } func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -293,10 +299,12 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) { f.objects = append(f.objects, d) f.expectUpdateDeploymentStatusAction(d) - f.run(testutil.GetKey(d, t)) + f.run(ctx, testutil.GetKey(d, t)) } func TestSyncDeploymentDeletionRace(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -318,11 +326,15 @@ func TestSyncDeploymentDeletionRace(t *testing.T) { f.expectGetDeploymentAction(d) // Sync should fail and requeue to let cache catch up. // Don't start informers, since we don't want cache to catch up for this test. - f.runExpectError(testutil.GetKey(d, t), false) + f.runExpectError(ctx, testutil.GetKey(d, t), false) } // issue: https://github.com/kubernetes/kubernetes/issues/23218 func TestDontSyncDeploymentsWithEmptyPodSelector(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + f := newFixture(t) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -332,10 +344,12 @@ func TestDontSyncDeploymentsWithEmptyPodSelector(t *testing.T) { // Normally there should be a status update to sync observedGeneration but the fake // deployment has no generation set so there is no action happening here. - f.run(testutil.GetKey(d, t)) + f.run(ctx, testutil.GetKey(d, t)) } func TestReentrantRollback(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -359,13 +373,15 @@ func TestReentrantRollback(t *testing.T) { // Rollback is done here f.expectUpdateDeploymentAction(d) // Expect no update on replica sets though - f.run(testutil.GetKey(d, t)) + f.run(ctx, testutil.GetKey(d, t)) } // TestPodDeletionEnqueuesRecreateDeployment ensures that the deletion of a pod // will requeue a Recreate deployment iff there is no other pod returned from the // client. func TestPodDeletionEnqueuesRecreateDeployment(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -377,7 +393,7 @@ func TestPodDeletionEnqueuesRecreateDeployment(t *testing.T) { f.rsLister = append(f.rsLister, rs) f.objects = append(f.objects, foo, rs) - c, _, err := f.newController() + c, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -388,7 +404,7 @@ func TestPodDeletionEnqueuesRecreateDeployment(t *testing.T) { } } - c.deletePod(pod) + c.deletePod(logger, pod) if !enqueued { t.Errorf("expected deployment %q to be queued after pod deletion", foo.Name) @@ -399,6 +415,8 @@ func TestPodDeletionEnqueuesRecreateDeployment(t *testing.T) { // will not requeue a Recreate deployment iff there are other pods returned from the // client. func TestPodDeletionDoesntEnqueueRecreateDeployment(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -413,7 +431,7 @@ func TestPodDeletionDoesntEnqueueRecreateDeployment(t *testing.T) { // return a non-empty list. f.podLister = append(f.podLister, pod1, pod2) - c, _, err := f.newController() + c, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -424,7 +442,7 @@ func TestPodDeletionDoesntEnqueueRecreateDeployment(t *testing.T) { } } - c.deletePod(pod1) + c.deletePod(logger, pod1) if enqueued { t.Errorf("expected deployment %q not to be queued after pod deletion", foo.Name) @@ -436,6 +454,8 @@ func TestPodDeletionDoesntEnqueueRecreateDeployment(t *testing.T) { // pod returned from the client in the case where a deployment has multiple replica // sets, some of which have empty owner references. func TestPodDeletionPartialReplicaSetOwnershipEnqueueRecreateDeployment(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -449,7 +469,7 @@ func TestPodDeletionPartialReplicaSetOwnershipEnqueueRecreateDeployment(t *testi f.rsLister = append(f.rsLister, rs1, rs2) f.objects = append(f.objects, foo, rs1, rs2) - c, _, err := f.newController() + c, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -460,7 +480,7 @@ func TestPodDeletionPartialReplicaSetOwnershipEnqueueRecreateDeployment(t *testi } } - c.deletePod(pod) + c.deletePod(logger, pod) if !enqueued { t.Errorf("expected deployment %q to be queued after pod deletion", foo.Name) @@ -472,6 +492,8 @@ func TestPodDeletionPartialReplicaSetOwnershipEnqueueRecreateDeployment(t *testi // returned from the client in the case where a deployment has multiple replica sets, // some of which have empty owner references. func TestPodDeletionPartialReplicaSetOwnershipDoesntEnqueueRecreateDeployment(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -488,7 +510,7 @@ func TestPodDeletionPartialReplicaSetOwnershipDoesntEnqueueRecreateDeployment(t // return a non-empty list. f.podLister = append(f.podLister, pod) - c, _, err := f.newController() + c, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -499,7 +521,7 @@ func TestPodDeletionPartialReplicaSetOwnershipDoesntEnqueueRecreateDeployment(t } } - c.deletePod(pod) + c.deletePod(logger, pod) if enqueued { t.Errorf("expected deployment %q not to be queued after pod deletion", foo.Name) @@ -507,6 +529,8 @@ func TestPodDeletionPartialReplicaSetOwnershipDoesntEnqueueRecreateDeployment(t } func TestGetReplicaSetsForDeployment(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + f := newFixture(t) // Two Deployments with same labels. @@ -523,7 +547,7 @@ func TestGetReplicaSetsForDeployment(t *testing.T) { f.objects = append(f.objects, d1, d2, rs1, rs2) // Start the fixture. - c, informers, err := f.newController() + c, informers, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -531,7 +555,7 @@ func TestGetReplicaSetsForDeployment(t *testing.T) { defer close(stopCh) informers.Start(stopCh) - rsList, err := c.getReplicaSetsForDeployment(context.TODO(), d1) + rsList, err := c.getReplicaSetsForDeployment(ctx, d1) if err != nil { t.Fatalf("getReplicaSetsForDeployment() error: %v", err) } @@ -543,7 +567,7 @@ func TestGetReplicaSetsForDeployment(t *testing.T) { t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs1.Name) } - rsList, err = c.getReplicaSetsForDeployment(context.TODO(), d2) + rsList, err = c.getReplicaSetsForDeployment(ctx, d2) if err != nil { t.Fatalf("getReplicaSetsForDeployment() error: %v", err) } @@ -557,6 +581,8 @@ func TestGetReplicaSetsForDeployment(t *testing.T) { } func TestGetReplicaSetsForDeploymentAdoptRelease(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -573,7 +599,7 @@ func TestGetReplicaSetsForDeploymentAdoptRelease(t *testing.T) { f.objects = append(f.objects, d, rsAdopt, rsRelease) // Start the fixture. - c, informers, err := f.newController() + c, informers, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -581,7 +607,7 @@ func TestGetReplicaSetsForDeploymentAdoptRelease(t *testing.T) { defer close(stopCh) informers.Start(stopCh) - rsList, err := c.getReplicaSetsForDeployment(context.TODO(), d) + rsList, err := c.getReplicaSetsForDeployment(ctx, d) if err != nil { t.Fatalf("getReplicaSetsForDeployment() error: %v", err) } @@ -595,6 +621,8 @@ func TestGetReplicaSetsForDeploymentAdoptRelease(t *testing.T) { } func TestGetPodMapForReplicaSets(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -620,7 +648,7 @@ func TestGetPodMapForReplicaSets(t *testing.T) { f.objects = append(f.objects, d, rs1, rs2, pod1, pod2, pod3, pod4) // Start the fixture. - c, informers, err := f.newController() + c, informers, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -661,6 +689,8 @@ func TestGetPodMapForReplicaSets(t *testing.T) { } func TestAddReplicaSet(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -676,12 +706,12 @@ func TestAddReplicaSet(t *testing.T) { // Create the fixture but don't start it, // so nothing happens in the background. - dc, _, err := f.newController() + dc, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } - dc.addReplicaSet(rs1) + dc.addReplicaSet(klog.FromContext(ctx), rs1) if got, want := dc.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -694,7 +724,7 @@ func TestAddReplicaSet(t *testing.T) { t.Errorf("queue.Get() = %v, want %v", got, want) } - dc.addReplicaSet(rs2) + dc.addReplicaSet(logger, rs2) if got, want := dc.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -709,6 +739,8 @@ func TestAddReplicaSet(t *testing.T) { } func TestAddReplicaSetOrphan(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) // 2 will match the RS, 1 won't. @@ -726,18 +758,20 @@ func TestAddReplicaSetOrphan(t *testing.T) { // Create the fixture but don't start it, // so nothing happens in the background. - dc, _, err := f.newController() + dc, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } - dc.addReplicaSet(rs) + dc.addReplicaSet(logger, rs) if got, want := dc.queue.Len(), 2; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func TestUpdateReplicaSet(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -754,7 +788,7 @@ func TestUpdateReplicaSet(t *testing.T) { // Create the fixture but don't start it, // so nothing happens in the background. - dc, _, err := f.newController() + dc, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -762,7 +796,7 @@ func TestUpdateReplicaSet(t *testing.T) { prev := *rs1 next := *rs1 bumpResourceVersion(&next) - dc.updateReplicaSet(&prev, &next) + dc.updateReplicaSet(logger, &prev, &next) if got, want := dc.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -778,7 +812,7 @@ func TestUpdateReplicaSet(t *testing.T) { prev = *rs2 next = *rs2 bumpResourceVersion(&next) - dc.updateReplicaSet(&prev, &next) + dc.updateReplicaSet(logger, &prev, &next) if got, want := dc.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -793,6 +827,8 @@ func TestUpdateReplicaSet(t *testing.T) { } func TestUpdateReplicaSetOrphanWithNewLabels(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -808,7 +844,7 @@ func TestUpdateReplicaSetOrphanWithNewLabels(t *testing.T) { // Create the fixture but don't start it, // so nothing happens in the background. - dc, _, err := f.newController() + dc, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -818,13 +854,15 @@ func TestUpdateReplicaSetOrphanWithNewLabels(t *testing.T) { prev.Labels = map[string]string{"foo": "notbar"} next := *rs bumpResourceVersion(&next) - dc.updateReplicaSet(&prev, &next) + dc.updateReplicaSet(logger, &prev, &next) if got, want := dc.queue.Len(), 2; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func TestUpdateReplicaSetChangeControllerRef(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -838,7 +876,7 @@ func TestUpdateReplicaSetChangeControllerRef(t *testing.T) { // Create the fixture but don't start it, // so nothing happens in the background. - dc, _, err := f.newController() + dc, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -848,13 +886,15 @@ func TestUpdateReplicaSetChangeControllerRef(t *testing.T) { prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(d2, controllerKind)} next := *rs bumpResourceVersion(&next) - dc.updateReplicaSet(&prev, &next) + dc.updateReplicaSet(logger, &prev, &next) if got, want := dc.queue.Len(), 2; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func TestUpdateReplicaSetRelease(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -868,7 +908,7 @@ func TestUpdateReplicaSetRelease(t *testing.T) { // Create the fixture but don't start it, // so nothing happens in the background. - dc, _, err := f.newController() + dc, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -878,13 +918,15 @@ func TestUpdateReplicaSetRelease(t *testing.T) { next := *rs next.OwnerReferences = nil bumpResourceVersion(&next) - dc.updateReplicaSet(&prev, &next) + dc.updateReplicaSet(logger, &prev, &next) if got, want := dc.queue.Len(), 2; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func TestDeleteReplicaSet(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -901,12 +943,12 @@ func TestDeleteReplicaSet(t *testing.T) { // Create the fixture but don't start it, // so nothing happens in the background. - dc, _, err := f.newController() + dc, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } - dc.deleteReplicaSet(rs1) + dc.deleteReplicaSet(logger, rs1) if got, want := dc.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -919,7 +961,7 @@ func TestDeleteReplicaSet(t *testing.T) { t.Errorf("queue.Get() = %v, want %v", got, want) } - dc.deleteReplicaSet(rs2) + dc.deleteReplicaSet(logger, rs2) if got, want := dc.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -934,6 +976,8 @@ func TestDeleteReplicaSet(t *testing.T) { } func TestDeleteReplicaSetOrphan(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + f := newFixture(t) d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -949,18 +993,20 @@ func TestDeleteReplicaSetOrphan(t *testing.T) { // Create the fixture but don't start it, // so nothing happens in the background. - dc, _, err := f.newController() + dc, _, err := f.newController(ctx) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } - dc.deleteReplicaSet(rs) + dc.deleteReplicaSet(logger, rs) if got, want := dc.queue.Len(), 0; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } } func BenchmarkGetPodMapForDeployment(b *testing.B) { + _, ctx := ktesting.NewTestContext(b) + f := newFixture(b) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) @@ -984,7 +1030,7 @@ func BenchmarkGetPodMapForDeployment(b *testing.B) { f.objects = append(f.objects, objects...) // Start the fixture. - c, informers, err := f.newController() + c, informers, err := f.newController(ctx) if err != nil { b.Fatalf("error creating Deployment controller: %v", err) } diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go index 65cbb717a9d..279e201b706 100644 --- a/pkg/controller/deployment/progress.go +++ b/pkg/controller/deployment/progress.go @@ -22,11 +22,10 @@ import ( "reflect" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/klog/v2" - apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/deployment/util" ) @@ -84,7 +83,7 @@ func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs [] } util.SetDeploymentCondition(&newStatus, *condition) - case util.DeploymentTimedOut(d, &newStatus): + case util.DeploymentTimedOut(ctx, d, &newStatus): // Update the deployment with a timeout condition. If the condition already exists, // we ignore this update. msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name) @@ -108,7 +107,7 @@ func (dc *DeploymentController) syncRolloutStatus(ctx context.Context, allRSs [] // Do not update if there is nothing new to add. if reflect.DeepEqual(d.Status, newStatus) { // Requeue the deployment if required. - dc.requeueStuckDeployment(d, newStatus) + dc.requeueStuckDeployment(ctx, d, newStatus) return nil } @@ -158,7 +157,8 @@ var nowFn = func() time.Time { return time.Now() } // requeueStuckDeployment checks whether the provided deployment needs to be synced for a progress // check. It returns the time after the deployment will be requeued for the progress check, 0 if it // will be requeued now, or -1 if it does not need to be requeued. -func (dc *DeploymentController) requeueStuckDeployment(d *apps.Deployment, newStatus apps.DeploymentStatus) time.Duration { +func (dc *DeploymentController) requeueStuckDeployment(ctx context.Context, d *apps.Deployment, newStatus apps.DeploymentStatus) time.Duration { + logger := klog.FromContext(ctx) currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing) // Can't estimate progress if there is no deadline in the spec or progressing condition in the current status. if !util.HasProgressDeadline(d) || currentCond == nil { @@ -188,11 +188,11 @@ func (dc *DeploymentController) requeueStuckDeployment(d *apps.Deployment, newSt // Make it ratelimited so we stay on the safe side, eventually the Deployment should // transition either to a Complete or to a TimedOut condition. if after < time.Second { - klog.V(4).Infof("Queueing up deployment %q for a progress check now", d.Name) + logger.V(4).Info("Queueing up deployment for a progress check now", "deployment", klog.KObj(d)) dc.enqueueRateLimited(d) return time.Duration(0) } - klog.V(4).Infof("Queueing up deployment %q for a progress check after %ds", d.Name, int(after.Seconds())) + logger.V(4).Info("Queueing up deployment for a progress check", "deployment", klog.KObj(d), "queueAfter", int(after.Seconds())) // Add a second to avoid milliseconds skew in AddAfter. // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. dc.enqueueAfter(d, after+time.Second) diff --git a/pkg/controller/deployment/progress_test.go b/pkg/controller/deployment/progress_test.go index 681a079de6d..0145711f3c4 100644 --- a/pkg/controller/deployment/progress_test.go +++ b/pkg/controller/deployment/progress_test.go @@ -23,10 +23,11 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller/deployment/util" ) @@ -176,7 +177,10 @@ func TestRequeueStuckDeployment(t *testing.T) { if test.nowFn != nil { nowFn = test.nowFn } - got := dc.requeueStuckDeployment(test.d, test.status) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + got := dc.requeueStuckDeployment(ctx, test.d, test.status) if got != test.expected { t.Errorf("%s: got duration: %v, expected duration: %v", test.name, got, test.expected) } @@ -330,8 +334,8 @@ func TestSyncRolloutStatus(t *testing.T) { if test.newRS != nil { test.allRSs = append(test.allRSs, test.newRS) } - - err := dc.syncRolloutStatus(context.TODO(), test.allRSs, test.newRS, test.d) + _, ctx := ktesting.NewTestContext(t) + err := dc.syncRolloutStatus(ctx, test.allRSs, test.newRS, test.d) if err != nil { t.Error(err) } diff --git a/pkg/controller/deployment/recreate_test.go b/pkg/controller/deployment/recreate_test.go index 315c683d6f0..b3f4bfc4a82 100644 --- a/pkg/controller/deployment/recreate_test.go +++ b/pkg/controller/deployment/recreate_test.go @@ -17,17 +17,17 @@ limitations under the License. package deployment import ( - "context" "fmt" "testing" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" ) @@ -66,13 +66,14 @@ func TestScaleDownOldReplicaSets(t *testing.T) { kc := fake.NewSimpleClientset(expected...) informers := informers.NewSharedInformerFactory(kc, controller.NoResyncPeriodFunc()) - c, err := NewDeploymentController(informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(), kc) + _, ctx := ktesting.NewTestContext(t) + c, err := NewDeploymentController(ctx, informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(), kc) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } c.eventRecorder = &record.FakeRecorder{} - c.scaleDownOldReplicaSetsForRecreate(context.TODO(), oldRSs, test.d) + c.scaleDownOldReplicaSetsForRecreate(ctx, oldRSs, test.d) for j := range oldRSs { rs := oldRSs[j] diff --git a/pkg/controller/deployment/rollback.go b/pkg/controller/deployment/rollback.go index d27c0fc019d..af5eedd6a85 100644 --- a/pkg/controller/deployment/rollback.go +++ b/pkg/controller/deployment/rollback.go @@ -21,17 +21,17 @@ import ( "fmt" "strconv" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/klog/v2" - apps "k8s.io/api/apps/v1" "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" ) // rollback the deployment to the specified revision. In any case cleanup the rollback spec. func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error { + logger := klog.FromContext(ctx) newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true) if err != nil { return err @@ -51,11 +51,11 @@ func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment for _, rs := range allRSs { v, err := deploymentutil.Revision(rs) if err != nil { - klog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err) + logger.V(4).Info("Unable to extract revision from deployment's replica set", "replicaSet", klog.KObj(rs), "err", err) continue } if v == rollbackTo.Revision { - klog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v) + logger.V(4).Info("Found replica set with desired revision", "replicaSet", klog.KObj(rs), "revision", v) // rollback by copying podTemplate.Spec from the replica set // revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call // no-op if the spec matches current deployment's podTemplate.Spec @@ -75,9 +75,10 @@ func (dc *DeploymentController) rollback(ctx context.Context, d *apps.Deployment // updates the deployment with the replica set template in case they are different. It also // cleans up the rollback spec so subsequent requeues of the deployment won't end up in here. func (dc *DeploymentController) rollbackToTemplate(ctx context.Context, d *apps.Deployment, rs *apps.ReplicaSet) (bool, error) { + logger := klog.FromContext(ctx) performedRollback := false if !deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) { - klog.V(4).Infof("Rolling back deployment %q to template spec %+v", d.Name, rs.Spec.Template.Spec) + logger.V(4).Info("Rolling back deployment to old template spec", "deployment", klog.KObj(d), "templateSpec", rs.Spec.Template.Spec) deploymentutil.SetFromReplicaSetTemplate(d, rs.Spec.Template) // set RS (the old RS we'll rolling back to) annotations back to the deployment; // otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback. @@ -93,7 +94,7 @@ func (dc *DeploymentController) rollbackToTemplate(ctx context.Context, d *apps. deploymentutil.SetDeploymentAnnotationsTo(d, rs) performedRollback = true } else { - klog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %q, skipping rollback...", d.Name) + logger.V(4).Info("Rolling back to a revision that contains the same template as current deployment, skipping rollback...", "deployment", klog.KObj(d)) eventMsg := fmt.Sprintf("The rollback revision contains the same template as current deployment %q", d.Name) dc.emitRollbackWarningEvent(d, deploymentutil.RollbackTemplateUnchanged, eventMsg) } @@ -113,7 +114,8 @@ func (dc *DeploymentController) emitRollbackNormalEvent(d *apps.Deployment, mess // It is assumed that the caller will have updated the deployment template appropriately (in case // we want to rollback). func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(ctx context.Context, d *apps.Deployment) error { - klog.V(4).Infof("Cleans up rollbackTo of deployment %q", d.Name) + logger := klog.FromContext(ctx) + logger.V(4).Info("Cleans up rollbackTo of deployment", "deployment", klog.KObj(d)) setRollbackTo(d, nil) _, err := dc.client.AppsV1().Deployments(d.Namespace).Update(ctx, d, metav1.UpdateOptions{}) return err diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index 5e75046744a..1d3446b5f7f 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -85,14 +85,14 @@ func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allR } func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { + logger := klog.FromContext(ctx) oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) if oldPodsCount == 0 { // Can't scale down further return false, nil } - allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - klog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas) + logger.V(4).Info("New replica set", "replicaSet", klog.KObj(newRS), "availableReplicas", newRS.Status.AvailableReplicas) maxUnavailable := deploymentutil.MaxUnavailable(*deployment) // Check if we can scale down. We can scale down in the following 2 cases: @@ -138,7 +138,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, all if err != nil { return false, nil } - klog.V(4).Infof("Cleaned up unhealthy replicas from old RSes by %d", cleanupCount) + logger.V(4).Info("Cleaned up unhealthy replicas from old RSes", "count", cleanupCount) // Scale down old replica sets, need check maxUnavailable to ensure we can scale down allRSs = append(oldRSs, newRS) @@ -146,7 +146,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, all if err != nil { return false, nil } - klog.V(4).Infof("Scaled down old RSes of deployment %s by %d", deployment.Name, scaledDownCount) + logger.V(4).Info("Scaled down old RSes", "deployment", klog.KObj(deployment), "count", scaledDownCount) totalScaledDown := cleanupCount + scaledDownCount return totalScaledDown > 0, nil @@ -154,6 +154,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, all // cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted. func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) { + logger := klog.FromContext(ctx) sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) // Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order // such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will @@ -167,7 +168,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, ol // cannot scale down this replica set. continue } - klog.V(4).Infof("Found %d available pods in old RS %s/%s", targetRS.Status.AvailableReplicas, targetRS.Namespace, targetRS.Name) + logger.V(4).Info("Found available pods in old RS", "replicaSet", klog.KObj(targetRS), "availableReplicas", targetRS.Status.AvailableReplicas) if *(targetRS.Spec.Replicas) == targetRS.Status.AvailableReplicas { // no unhealthy replicas found, no scaling required. continue @@ -191,6 +192,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, ol // scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate". // Need check maxUnavailable to ensure availability func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) { + logger := klog.FromContext(ctx) maxUnavailable := deploymentutil.MaxUnavailable(*deployment) // Check if we can scale down. @@ -201,7 +203,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx cont // Cannot scale down. return 0, nil } - klog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name) + logger.V(4).Info("Found available pods in deployment, scaling down old RSes", "deployment", klog.KObj(deployment), "availableReplicas", availablePodCount) sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go index 1cd93050aaf..80f20e0343a 100644 --- a/pkg/controller/deployment/rolling_test.go +++ b/pkg/controller/deployment/rolling_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2/ktesting" ) func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { @@ -91,7 +92,10 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { client: &fake, eventRecorder: &record.FakeRecorder{}, } - scaled, err := controller.reconcileNewReplicaSet(context.TODO(), allRSs, newRS, deployment) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + scaled, err := controller.reconcileNewReplicaSet(ctx, allRSs, newRS, deployment) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -197,8 +201,8 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { client: &fakeClientset, eventRecorder: &record.FakeRecorder{}, } - - scaled, err := controller.reconcileOldReplicaSets(context.TODO(), allRSs, oldRSs, newRS, deployment) + _, ctx := ktesting.NewTestContext(t) + scaled, err := controller.reconcileOldReplicaSets(ctx, allRSs, oldRSs, newRS, deployment) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -266,7 +270,8 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { client: &fakeClientset, eventRecorder: &record.FakeRecorder{}, } - _, cleanupCount, err := controller.cleanupUnhealthyReplicas(context.TODO(), oldRSs, deployment, int32(test.maxCleanupCount)) + _, ctx := ktesting.NewTestContext(t) + _, cleanupCount, err := controller.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, int32(test.maxCleanupCount)) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -340,7 +345,8 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing client: &fakeClientset, eventRecorder: &record.FakeRecorder{}, } - scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(context.TODO(), allRSs, oldRSs, deployment) + _, ctx := ktesting.NewTestContext(t) + scaled, err := controller.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment) if err != nil { t.Errorf("unexpected error: %v", err) continue diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index d5a7861ba1f..c7ede39aa68 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -136,6 +136,7 @@ const ( // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. // Note that the pod-template-hash will be added to adopted RSes and pods. func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) { + logger := klog.FromContext(ctx) existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList) // Calculate the max revision number among all old RSes @@ -151,7 +152,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De rsCopy := existingNewRS.DeepCopy() // Set existing new replica set's annotation - annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true, maxRevHistoryLengthInChars) + annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(ctx, d, rsCopy, newRevision, true, maxRevHistoryLengthInChars) minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds @@ -215,7 +216,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De *(newRS.Spec.Replicas) = newReplicasCount // Set new replica set's annotation - deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false, maxRevHistoryLengthInChars) + deploymentutil.SetNewReplicaSetAnnotations(ctx, d, &newRS, newRevision, false, maxRevHistoryLengthInChars) // Create the new ReplicaSet. If it already exists, then we need to check for possible // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. @@ -254,7 +255,7 @@ func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.De // error. _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) if dErr == nil { - klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount) + logger.V(2).Info("Found a hash collision for deployment - bumping collisionCount to resolve it", "deployment", klog.KObj(d), "oldCollisionCount", preCollisionCount, "newCollisionCount", *d.Status.CollisionCount) } return nil, err case errors.HasStatusCause(err, v1.NamespaceTerminatingCause): @@ -355,13 +356,14 @@ func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Depl // value of deploymentReplicasToAdd. deploymentReplicasAdded := int32(0) nameToSize := make(map[string]int32) + logger := klog.FromContext(ctx) for i := range allRSs { rs := allRSs[i] // Estimate proportions if we have replicas to add, otherwise simply populate // nameToSize with the current sizes for each replica set. if deploymentReplicasToAdd != 0 { - proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) + proportion := deploymentutil.GetProportion(logger, rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion deploymentReplicasAdded += proportion @@ -434,6 +436,7 @@ func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.Re // where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept // around by default 1) for historical reasons and 2) for the ability to rollback a deployment. func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error { + logger := klog.FromContext(ctx) if !deploymentutil.HasRevisionHistoryLimit(deployment) { return nil } @@ -450,7 +453,7 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs [] } sort.Sort(deploymentutil.ReplicaSetsByRevision(cleanableRSes)) - klog.V(4).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name) + logger.V(4).Info("Looking to cleanup old replica sets for deployment", "deployment", klog.KObj(deployment)) for i := int32(0); i < diff; i++ { rs := cleanableRSes[i] @@ -458,7 +461,7 @@ func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs [] if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil { continue } - klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name) + logger.V(4).Info("Trying to cleanup replica set for deployment", "replicaSet", klog.KObj(rs), "deployment", klog.KObj(deployment)) if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { // Return error instead of aggregating and continuing DELETEs on the theory // that we may be overloading the api server. @@ -532,8 +535,9 @@ func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Depl return false, err } allRSs := append(oldRSs, newRS) + logger := klog.FromContext(ctx) for _, rs := range controller.FilterActiveReplicaSets(allRSs) { - desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs) + desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs) if !ok { continue } diff --git a/pkg/controller/deployment/sync_test.go b/pkg/controller/deployment/sync_test.go index 25b55d12217..f5f2f899d3f 100644 --- a/pkg/controller/deployment/sync_test.go +++ b/pkg/controller/deployment/sync_test.go @@ -17,7 +17,6 @@ limitations under the License. package deployment import ( - "context" "math" "testing" "time" @@ -30,6 +29,7 @@ import ( "k8s.io/client-go/kubernetes/fake" testclient "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" ) @@ -298,7 +298,9 @@ func TestScale(t *testing.T) { deploymentutil.SetReplicasAnnotations(rs, desiredReplicas, desiredReplicas+deploymentutil.MaxSurge(*test.oldDeployment)) } - if err := dc.scale(context.TODO(), test.deployment, test.newRS, test.oldRSs); err != nil { + _, ctx := ktesting.NewTestContext(t) + + if err := dc.scale(ctx, test.deployment, test.newRS, test.oldRSs); err != nil { t.Errorf("%s: unexpected error: %v", test.name, err) return } @@ -412,9 +414,11 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) { test := tests[i] t.Logf("scenario %d", i) + _, ctx := ktesting.NewTestContext(t) + fake := &fake.Clientset{} informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc()) - controller, err := NewDeploymentController(informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(), fake) + controller, err := NewDeploymentController(ctx, informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(), fake) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -434,7 +438,7 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) { t.Logf(" &test.revisionHistoryLimit: %d", test.revisionHistoryLimit) d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"}) - controller.cleanupDeployment(context.TODO(), test.oldRSs, d) + controller.cleanupDeployment(ctx, test.oldRSs, d) gotDeletions := 0 for _, action := range fake.Actions() { @@ -546,9 +550,11 @@ func TestDeploymentController_cleanupDeploymentOrder(t *testing.T) { test := tests[i] t.Logf("scenario %d", i) + _, ctx := ktesting.NewTestContext(t) + fake := &fake.Clientset{} informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc()) - controller, err := NewDeploymentController(informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(), fake) + controller, err := NewDeploymentController(ctx, informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(), fake) if err != nil { t.Fatalf("error creating Deployment controller: %v", err) } @@ -566,7 +572,7 @@ func TestDeploymentController_cleanupDeploymentOrder(t *testing.T) { informers.Start(stopCh) d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"}) - controller.cleanupDeployment(context.TODO(), test.oldRSs, d) + controller.cleanupDeployment(ctx, test.oldRSs, d) deletedRSs := sets.String{} for _, action := range fake.Actions() { diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index c8f12613b72..347c284ccb9 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -189,7 +189,7 @@ func MaxRevision(allRSs []*apps.ReplicaSet) int64 { for _, rs := range allRSs { if v, err := Revision(rs); err != nil { // Skip the replica sets when it failed to parse their revision information - klog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) + klog.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err) } else if v > max { max = v } @@ -203,7 +203,7 @@ func LastRevision(allRSs []*apps.ReplicaSet) int64 { for _, rs := range allRSs { if v, err := Revision(rs); err != nil { // Skip the replica sets when it failed to parse their revision information - klog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs) + klog.V(4).Info("Couldn't parse revision for replica set, deployment controller will skip it when reconciling revisions", "replicaSet", klog.KObj(rs), "err", err) } else if v >= max { secMax = max max = v @@ -229,7 +229,8 @@ func Revision(obj runtime.Object) (int64, error) { // SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and // copying required deployment annotations to it; it returns true if replica set's annotation is changed. -func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.ReplicaSet, newRevision string, exists bool, revHistoryLimitInChars int) bool { +func SetNewReplicaSetAnnotations(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, newRevision string, exists bool, revHistoryLimitInChars int) bool { + logger := klog.FromContext(ctx) // First, copy deployment's annotations (except for apply and revision annotations) annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS) // Then, update replica set's revision annotation @@ -244,7 +245,7 @@ func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.Replic oldRevisionInt, err := strconv.ParseInt(oldRevision, 10, 64) if err != nil { if oldRevision != "" { - klog.Warningf("Updating replica set revision OldRevision not int %s", err) + logger.Info("Updating replica set revision OldRevision not int", "err", err) return false } //If the RS annotation is empty then initialise it to 0 @@ -252,13 +253,13 @@ func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.Replic } newRevisionInt, err := strconv.ParseInt(newRevision, 10, 64) if err != nil { - klog.Warningf("Updating replica set revision NewRevision not int %s", err) + logger.Info("Updating replica set revision NewRevision not int", "err", err) return false } if oldRevisionInt < newRevisionInt { newRS.Annotations[RevisionAnnotation] = newRevision annotationChanged = true - klog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision) + logger.V(4).Info("Updating replica set revision", "replicaSet", klog.KObj(newRS), "newRevision", newRevision) } // If a revision annotation already existed and this replica set was updated with a new revision // then that means we are rolling back to this replica set. We need to preserve the old revisions @@ -280,7 +281,7 @@ func SetNewReplicaSetAnnotations(deployment *apps.Deployment, newRS *apps.Replic oldRevisions = append(oldRevisions[start:], oldRevision) newRS.Annotations[RevisionHistoryAnnotation] = strings.Join(oldRevisions, ",") } else { - klog.Warningf("Not appending revision due to length limit of %v reached", revHistoryLimitInChars) + logger.Info("Not appending revision due to revision history length limit reached", "revisionHistoryLimit", revHistoryLimitInChars) } } } @@ -376,22 +377,22 @@ func FindActiveOrLatest(newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) *apps } // GetDesiredReplicasAnnotation returns the number of desired replicas -func GetDesiredReplicasAnnotation(rs *apps.ReplicaSet) (int32, bool) { - return getIntFromAnnotation(rs, DesiredReplicasAnnotation) +func GetDesiredReplicasAnnotation(logger klog.Logger, rs *apps.ReplicaSet) (int32, bool) { + return getIntFromAnnotation(logger, rs, DesiredReplicasAnnotation) } -func getMaxReplicasAnnotation(rs *apps.ReplicaSet) (int32, bool) { - return getIntFromAnnotation(rs, MaxReplicasAnnotation) +func getMaxReplicasAnnotation(logger klog.Logger, rs *apps.ReplicaSet) (int32, bool) { + return getIntFromAnnotation(logger, rs, MaxReplicasAnnotation) } -func getIntFromAnnotation(rs *apps.ReplicaSet, annotationKey string) (int32, bool) { +func getIntFromAnnotation(logger klog.Logger, rs *apps.ReplicaSet, annotationKey string) (int32, bool) { annotationValue, ok := rs.Annotations[annotationKey] if !ok { return int32(0), false } intValue, err := strconv.Atoi(annotationValue) if err != nil { - klog.V(2).Infof("Cannot convert the value %q with annotation key %q for the replica set %q", annotationValue, annotationKey, rs.Name) + logger.V(2).Info("Could not convert the value with annotation key for the replica set", "annotationValue", annotationValue, "annotationKey", annotationKey, "replicaSet", klog.KObj(rs)) return int32(0), false } return int32(intValue), true @@ -466,12 +467,12 @@ func MaxSurge(deployment apps.Deployment) int32 { // GetProportion will estimate the proportion for the provided replica set using 1. the current size // of the parent deployment, 2. the replica count that needs be added on the replica sets of the // deployment, and 3. the total replicas added in the replica sets of the deployment so far. -func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 { +func GetProportion(logger klog.Logger, rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 { if rs == nil || *(rs.Spec.Replicas) == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded { return int32(0) } - rsFraction := getReplicaSetFraction(*rs, d) + rsFraction := getReplicaSetFraction(logger, *rs, d) allowed := deploymentReplicasToAdd - deploymentReplicasAdded if deploymentReplicasToAdd > 0 { @@ -488,14 +489,14 @@ func GetProportion(rs *apps.ReplicaSet, d apps.Deployment, deploymentReplicasToA // getReplicaSetFraction estimates the fraction of replicas a replica set can have in // 1. a scaling event during a rollout or 2. when scaling a paused deployment. -func getReplicaSetFraction(rs apps.ReplicaSet, d apps.Deployment) int32 { +func getReplicaSetFraction(logger klog.Logger, rs apps.ReplicaSet, d apps.Deployment) int32 { // If we are scaling down to zero then the fraction of this replica set is its whole size (negative) if *(d.Spec.Replicas) == int32(0) { return -*(rs.Spec.Replicas) } deploymentReplicas := *(d.Spec.Replicas) + MaxSurge(d) - annotatedReplicas, ok := getMaxReplicasAnnotation(&rs) + annotatedReplicas, ok := getMaxReplicasAnnotation(logger, &rs) if !ok { // If we cannot find the annotation then fallback to the current deployment size. Note that this // will not be an accurate proportion estimation in case other replica sets have different values @@ -734,7 +735,7 @@ var nowFn = func() time.Time { return time.Now() } // DeploymentTimedOut considers a deployment to have timed out once its condition that reports progress // is older than progressDeadlineSeconds or a Progressing condition with a TimedOutReason reason already // exists. -func DeploymentTimedOut(deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool { +func DeploymentTimedOut(ctx context.Context, deployment *apps.Deployment, newStatus *apps.DeploymentStatus) bool { if !HasProgressDeadline(deployment) { return false } @@ -763,7 +764,7 @@ func DeploymentTimedOut(deployment *apps.Deployment, newStatus *apps.DeploymentS if condition.Reason == TimedOutReason { return true } - + logger := klog.FromContext(ctx) // Look at the difference in seconds between now and the last time we reported any // progress or tried to create a replica set, or resumed a paused deployment and // compare against progressDeadlineSeconds. @@ -772,7 +773,7 @@ func DeploymentTimedOut(deployment *apps.Deployment, newStatus *apps.DeploymentS delta := time.Duration(*deployment.Spec.ProgressDeadlineSeconds) * time.Second timedOut := from.Add(delta).Before(now) - klog.V(4).Infof("Deployment %q timed out (%t) [last progress check: %v - now: %v]", deployment.Name, timedOut, from, now) + logger.V(4).Info("Deployment timed out from last progress check", "deployment", klog.KObj(deployment), "timeout", timedOut, "from", from, "now", now) return timedOut } diff --git a/pkg/controller/deployment/util/deployment_util_test.go b/pkg/controller/deployment/util/deployment_util_test.go index 2557df7b67e..ce09f03165a 100644 --- a/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -27,13 +27,14 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" "k8s.io/utils/pointer" ) @@ -944,7 +945,8 @@ func TestDeploymentTimedOut(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { nowFn = test.nowFn - if got, exp := DeploymentTimedOut(&test.d, &test.d.Status), test.expected; got != exp { + _, ctx := ktesting.NewTestContext(t) + if got, exp := DeploymentTimedOut(ctx, &test.d, &test.d.Status), test.expected; got != exp { t.Errorf("expected timeout: %t, got: %t", exp, got) } }) @@ -1040,11 +1042,13 @@ func TestAnnotationUtils(t *testing.T) { //Test Case 1: Check if anotations are copied properly from deployment to RS t.Run("SetNewReplicaSetAnnotations", func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + //Try to set the increment revision from 11 through 20 for i := 10; i < 20; i++ { nextRevision := fmt.Sprintf("%d", i+1) - SetNewReplicaSetAnnotations(&tDeployment, &tRS, nextRevision, true, 5) + SetNewReplicaSetAnnotations(ctx, &tDeployment, &tRS, nextRevision, true, 5) //Now the ReplicaSets Revision Annotation should be i+1 if i >= 12 { diff --git a/test/integration/deployment/deployment_test.go b/test/integration/deployment/deployment_test.go index f5873b4f5ae..0d8a19d9f20 100644 --- a/test/integration/deployment/deployment_test.go +++ b/test/integration/deployment/deployment_test.go @@ -23,12 +23,13 @@ import ( "testing" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" + "k8s.io/klog/v2/ktesting" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/test/integration/framework" testutil "k8s.io/kubernetes/test/utils" @@ -36,7 +37,11 @@ import ( ) func TestNewDeployment(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-new-deployment" @@ -112,7 +117,11 @@ func TestNewDeployment(t *testing.T) { // TODO: drop the rollback portions of this test when extensions/v1beta1 is no longer served // and rollback endpoint is no longer supported. func TestDeploymentRollingUpdate(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-rolling-update-deployment" @@ -242,7 +251,11 @@ func TestDeploymentSelectorImmutability(t *testing.T) { // Paused deployment should not start new rollout func TestPausedDeployment(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-paused-deployment" @@ -342,7 +355,11 @@ func TestPausedDeployment(t *testing.T) { // Paused deployment can be scaled func TestScalePausedDeployment(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-scale-paused-deployment" @@ -423,7 +440,11 @@ func TestScalePausedDeployment(t *testing.T) { // Deployment rollout shouldn't be blocked on hash collisions func TestDeploymentHashCollision(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-hash-collision-deployment" @@ -522,7 +543,11 @@ func checkPodsHashLabel(pods *v1.PodList) (string, error) { // Deployment should have a timeout condition when it fails to progress after given deadline. func TestFailedDeployment(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-failed-deployment" @@ -566,7 +591,11 @@ func TestFailedDeployment(t *testing.T) { } func TestOverlappingDeployments(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-overlapping-deployments" @@ -647,7 +676,11 @@ func TestOverlappingDeployments(t *testing.T) { // Deployment should not block rollout when updating spec replica number and template at the same time. func TestScaledRolloutDeployment(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-scaled-rollout-deployment" @@ -749,7 +782,7 @@ func TestScaledRolloutDeployment(t *testing.T) { if err != nil { t.Fatalf("failed to get replicaset when checking desired replicas annotation: %v", err) } - desired, ok := deploymentutil.GetDesiredReplicasAnnotation(curRS) + desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, curRS) if !ok { t.Fatalf("failed to retrieve desiredReplicas annotation for replicaset %q", curRS.Name) } @@ -826,7 +859,7 @@ func TestScaledRolloutDeployment(t *testing.T) { if err != nil { t.Fatalf("failed to get replicaset when checking desired replicas annotation: %v", err) } - desired, ok := deploymentutil.GetDesiredReplicasAnnotation(curRS) + desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, curRS) if !ok { t.Fatalf("failed to retrieve desiredReplicas annotation for replicaset %q", curRS.Name) } @@ -837,7 +870,11 @@ func TestScaledRolloutDeployment(t *testing.T) { } func TestSpecReplicasChange(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-spec-replicas-change" @@ -891,7 +928,11 @@ func TestSpecReplicasChange(t *testing.T) { } func TestDeploymentAvailableCondition(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-deployment-available-condition" @@ -1010,7 +1051,11 @@ func testRSControllerRefPatch(t *testing.T, tester *deploymentTester, rs *apps.R } func TestGeneralReplicaSetAdoption(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-general-replicaset-adoption" @@ -1100,7 +1145,11 @@ func testScalingUsingScaleSubresource(t *testing.T, tester *deploymentTester, re } func TestDeploymentScaleSubresource(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-deployment-scale-subresource" @@ -1142,7 +1191,11 @@ func TestDeploymentScaleSubresource(t *testing.T) { // is orphaned, even without PodTemplateSpec change. Refer comment below for more info: // https://github.com/kubernetes/kubernetes/pull/59212#discussion_r166465113 func TestReplicaSetOrphaningAndAdoptionWhenLabelsChange(t *testing.T) { - closeFn, rm, dc, informers, c := dcSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, rm, dc, informers, c := dcSetup(ctx, t) defer closeFn() name := "test-replicaset-orphaning-and-adoption-when-labels-change" diff --git a/test/integration/deployment/util.go b/test/integration/deployment/util.go index b2daf7a17e2..bc9d74a7ea0 100644 --- a/test/integration/deployment/util.go +++ b/test/integration/deployment/util.go @@ -103,7 +103,7 @@ func newDeployment(name, ns string, replicas int32) *apps.Deployment { } // dcSetup sets up necessities for Deployment integration test, including control plane, apiserver, informers, and clientset -func dcSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.ReplicaSetController, *deployment.DeploymentController, informers.SharedInformerFactory, clientset.Interface) { +func dcSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.ReplicaSetController, *deployment.DeploymentController, informers.SharedInformerFactory, clientset.Interface) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) logger, _ := ktesting.NewTestContext(t) @@ -117,6 +117,7 @@ func dcSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.Repli informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "deployment-informers")), resyncPeriod) dc, err := deployment.NewDeploymentController( + ctx, informers.Apps().V1().Deployments(), informers.Apps().V1().ReplicaSets(), informers.Core().V1().Pods(),