From f5bd8c86d42908cf90ae7e22e7e5db732acc7870 Mon Sep 17 00:00:00 2001 From: JunYang Date: Sat, 29 Oct 2022 22:34:47 +0800 Subject: [PATCH] namespace controller: use contextual logging --- cmd/kube-controller-manager/app/core.go | 4 +- .../deletion/namespaced_resources_deleter.go | 98 ++++++++++--------- .../namespaced_resources_deleter_test.go | 19 ++-- .../namespace/namespace_controller.go | 40 ++++---- test/e2e_node/services/internal_services.go | 11 ++- .../e2e_node/services/namespace_controller.go | 7 +- .../namespace/ns_conditions_test.go | 21 ++-- 7 files changed, 113 insertions(+), 87 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index cdcbd854b67..5fb6b9b84fd 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -456,7 +456,9 @@ func startModifiedNamespaceController(ctx context.Context, controllerContext Con discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources + ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "namespace")) namespaceController := namespacecontroller.NewNamespaceController( + ctx, namespaceKubeClient, metadataClient, discoverResourcesFn, @@ -464,7 +466,7 @@ func startModifiedNamespaceController(ctx context.Context, controllerContext Con controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes, ) - go namespaceController.Run(int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Done()) + go namespaceController.Run(ctx, int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs)) return nil, true, nil } diff --git a/pkg/controller/namespace/deletion/namespaced_resources_deleter.go b/pkg/controller/namespace/deletion/namespaced_resources_deleter.go index 4e8dd5b3a13..0ede5a43b1e 100644 --- a/pkg/controller/namespace/deletion/namespaced_resources_deleter.go +++ b/pkg/controller/namespace/deletion/namespaced_resources_deleter.go @@ -39,11 +39,11 @@ import ( // NamespacedResourcesDeleterInterface is the interface to delete a namespace with all resources in it. type NamespacedResourcesDeleterInterface interface { - Delete(nsName string) error + Delete(ctx context.Context, nsName string) error } // NewNamespacedResourcesDeleter returns a new NamespacedResourcesDeleter. -func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface, +func NewNamespacedResourcesDeleter(ctx context.Context, nsClient v1clientset.NamespaceInterface, metadataClient metadata.Interface, podsGetter v1clientset.PodsGetter, discoverResourcesFn func() ([]*metav1.APIResourceList, error), finalizerToken v1.FinalizerName) NamespacedResourcesDeleterInterface { @@ -57,7 +57,7 @@ func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface, discoverResourcesFn: discoverResourcesFn, finalizerToken: finalizerToken, } - d.initOpCache() + d.initOpCache(ctx) return d } @@ -93,7 +93,7 @@ type namespacedResourcesDeleter struct { // Returns ResourcesRemainingError if it deleted some resources but needs // to wait for them to go away. // Caller is expected to keep calling this until it succeeds. -func (d *namespacedResourcesDeleter) Delete(nsName string) error { +func (d *namespacedResourcesDeleter) Delete(ctx context.Context, nsName string) error { // Multiple controllers may edit a namespace during termination // first get the latest state of the namespace before proceeding // if the namespace was deleted already, don't do anything @@ -108,7 +108,7 @@ func (d *namespacedResourcesDeleter) Delete(nsName string) error { return nil } - klog.V(5).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, d.finalizerToken) + klog.FromContext(ctx).V(5).Info("Namespace controller - syncNamespace", "namespace", namespace.Name, "finalizerToken", d.finalizerToken) // ensure that the status is up to date on the namespace // if we get a not found error, we assume the namespace is truly gone @@ -131,7 +131,7 @@ func (d *namespacedResourcesDeleter) Delete(nsName string) error { } // there may still be content for us to remove - estimate, err := d.deleteAllContent(namespace) + estimate, err := d.deleteAllContent(ctx, namespace) if err != nil { return err } @@ -153,7 +153,7 @@ func (d *namespacedResourcesDeleter) Delete(nsName string) error { return nil } -func (d *namespacedResourcesDeleter) initOpCache() { +func (d *namespacedResourcesDeleter) initOpCache(ctx context.Context) { // pre-fill opCache with the discovery info // // TODO(sttts): get rid of opCache and http 405 logic around it and trust discovery info @@ -161,14 +161,16 @@ func (d *namespacedResourcesDeleter) initOpCache() { if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get all supported resources from server: %v", err)) } + logger := klog.FromContext(ctx) if len(resources) == 0 { - klog.Fatalf("Unable to get any supported resources from server: %v", err) + logger.Error(err, "Unable to get any supported resources from server") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) } for _, rl := range resources { gv, err := schema.ParseGroupVersion(rl.GroupVersion) if err != nil { - klog.Errorf("Failed to parse GroupVersion %q, skipping: %v", rl.GroupVersion, err) + logger.Error(err, "Failed to parse GroupVersion, skipping", "groupVersion", rl.GroupVersion) continue } @@ -177,7 +179,7 @@ func (d *namespacedResourcesDeleter) initOpCache() { verbs := sets.NewString([]string(r.Verbs)...) if !verbs.Has("delete") { - klog.V(6).Infof("Skipping resource %v because it cannot be deleted.", gvr) + logger.V(6).Info("Skipping resource because it cannot be deleted", "resource", gvr) } for _, op := range []operation{operationList, operationDeleteCollection} { @@ -205,7 +207,7 @@ const ( operationDeleteCollection operation = "deletecollection" operationList operation = "list" // assume a default estimate for finalizers to complete when found on items pending deletion. - finalizerEstimateSeconds int64 = int64(15) + finalizerEstimateSeconds = int64(15) ) // operationKey is an entry in a cache. @@ -304,12 +306,13 @@ func (d *namespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace) // deleteCollection is a helper function that will delete the collection of resources // it returns true if the operation was supported on the server. // it returns an error if the operation was supported on the server but was unable to complete. -func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionResource, namespace string) (bool, error) { - klog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr) +func (d *namespacedResourcesDeleter) deleteCollection(ctx context.Context, gvr schema.GroupVersionResource, namespace string) (bool, error) { + logger := klog.FromContext(ctx) + logger.V(5).Info("Namespace controller - deleteCollection", "namespace", namespace, "resource", gvr) key := operationKey{operation: operationDeleteCollection, gvr: gvr} if !d.opCache.isSupported(key) { - klog.V(5).Infof("namespace controller - deleteCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr) + logger.V(5).Info("Namespace controller - deleteCollection ignored since not supported", "namespace", namespace, "resource", gvr) return false, nil } @@ -330,11 +333,11 @@ func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionRes // /apis/extensions/v1beta1/namespaces/default/replicationcontrollers // when working with this resource type, we will get a literal not found error rather than expected method not supported if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) { - klog.V(5).Infof("namespace controller - deleteCollection not supported - namespace: %s, gvr: %v", namespace, gvr) + logger.V(5).Info("Namespace controller - deleteCollection not supported", "namespace", namespace, "resource", gvr) return false, nil } - klog.V(5).Infof("namespace controller - deleteCollection unexpected error - namespace: %s, gvr: %v, error: %v", namespace, gvr, err) + logger.V(5).Info("Namespace controller - deleteCollection unexpected error", "namespace", namespace, "resource", gvr, "err", err) return true, err } @@ -344,12 +347,13 @@ func (d *namespacedResourcesDeleter) deleteCollection(gvr schema.GroupVersionRes // the list of items in the collection (if found) // a boolean if the operation is supported // an error if the operation is supported but could not be completed. -func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResource, namespace string) (*metav1.PartialObjectMetadataList, bool, error) { - klog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr) +func (d *namespacedResourcesDeleter) listCollection(ctx context.Context, gvr schema.GroupVersionResource, namespace string) (*metav1.PartialObjectMetadataList, bool, error) { + logger := klog.FromContext(ctx) + logger.V(5).Info("Namespace controller - listCollection", "namespace", namespace, "resource", gvr) key := operationKey{operation: operationList, gvr: gvr} if !d.opCache.isSupported(key) { - klog.V(5).Infof("namespace controller - listCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr) + logger.V(5).Info("Namespace controller - listCollection ignored since not supported", "namespace", namespace, "resource", gvr) return nil, false, nil } @@ -364,7 +368,7 @@ func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResou // /apis/extensions/v1beta1/namespaces/default/replicationcontrollers // when working with this resource type, we will get a literal not found error rather than expected method not supported if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) { - klog.V(5).Infof("namespace controller - listCollection not supported - namespace: %s, gvr: %v", namespace, gvr) + logger.V(5).Info("Namespace controller - listCollection not supported", "namespace", namespace, "resource", gvr) return nil, false, nil } @@ -372,10 +376,10 @@ func (d *namespacedResourcesDeleter) listCollection(gvr schema.GroupVersionResou } // deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1. -func (d *namespacedResourcesDeleter) deleteEachItem(gvr schema.GroupVersionResource, namespace string) error { - klog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr) +func (d *namespacedResourcesDeleter) deleteEachItem(ctx context.Context, gvr schema.GroupVersionResource, namespace string) error { + klog.FromContext(ctx).V(5).Info("Namespace controller - deleteEachItem", "namespace", namespace, "resource", gvr) - unstructuredList, listSupported, err := d.listCollection(gvr, namespace) + unstructuredList, listSupported, err := d.listCollection(ctx, gvr, namespace) if err != nil { return err } @@ -406,27 +410,29 @@ type gvrDeletionMetadata struct { // It returns an estimate of the time remaining before the remaining resources are deleted. // If estimate > 0, not all resources are guaranteed to be gone. func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource( + ctx context.Context, gvr schema.GroupVersionResource, namespace string, namespaceDeletedAt metav1.Time) (gvrDeletionMetadata, error) { - klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr) + logger := klog.FromContext(ctx) + logger.V(5).Info("Namespace controller - deleteAllContentForGroupVersionResource", "namespace", namespace, "resource", gvr) // estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete) - estimate, err := d.estimateGracefulTermination(gvr, namespace, namespaceDeletedAt) + estimate, err := d.estimateGracefulTermination(ctx, gvr, namespace, namespaceDeletedAt) if err != nil { - klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v", namespace, gvr, err) + logger.V(5).Info("Namespace controller - deleteAllContentForGroupVersionResource - unable to estimate", "namespace", namespace, "resource", gvr, "err", err) return gvrDeletionMetadata{}, err } - klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate) + logger.V(5).Info("Namespace controller - deleteAllContentForGroupVersionResource - estimate", "namespace", namespace, "resource", gvr, "estimate", estimate) // first try to delete the entire collection - deleteCollectionSupported, err := d.deleteCollection(gvr, namespace) + deleteCollectionSupported, err := d.deleteCollection(ctx, gvr, namespace) if err != nil { return gvrDeletionMetadata{finalizerEstimateSeconds: estimate}, err } // delete collection was not supported, so we list and delete each item... if !deleteCollectionSupported { - err = d.deleteEachItem(gvr, namespace) + err = d.deleteEachItem(ctx, gvr, namespace) if err != nil { return gvrDeletionMetadata{finalizerEstimateSeconds: estimate}, err } @@ -434,16 +440,16 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource( // verify there are no more remaining items // it is not an error condition for there to be remaining items if local estimate is non-zero - klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr) - unstructuredList, listSupported, err := d.listCollection(gvr, namespace) + logger.V(5).Info("Namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace", "namespace", namespace, "resource", gvr) + unstructuredList, listSupported, err := d.listCollection(ctx, gvr, namespace) if err != nil { - klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err) + logger.V(5).Info("Namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace", "namespace", namespace, "resource", gvr, "err", err) return gvrDeletionMetadata{finalizerEstimateSeconds: estimate}, err } if !listSupported { return gvrDeletionMetadata{finalizerEstimateSeconds: estimate}, nil } - klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining - namespace: %s, gvr: %v, items: %v", namespace, gvr, len(unstructuredList.Items)) + logger.V(5).Info("Namespace controller - deleteAllContentForGroupVersionResource - items remaining", "namespace", namespace, "resource", gvr, "items", len(unstructuredList.Items)) if len(unstructuredList.Items) == 0 { // we're done return gvrDeletionMetadata{finalizerEstimateSeconds: 0, numRemaining: 0}, nil @@ -458,7 +464,7 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource( } if estimate != int64(0) { - klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate is present - namespace: %s, gvr: %v, finalizers: %v", namespace, gvr, finalizersToNumRemaining) + logger.V(5).Info("Namespace controller - deleteAllContentForGroupVersionResource - estimate is present", "namespace", namespace, "resource", gvr, "finalizers", finalizersToNumRemaining) return gvrDeletionMetadata{ finalizerEstimateSeconds: estimate, numRemaining: len(unstructuredList.Items), @@ -468,7 +474,7 @@ func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource( // if any item has a finalizer, we treat that as a normal condition, and use a default estimation to allow for GC to complete. if len(finalizersToNumRemaining) > 0 { - klog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining with finalizers - namespace: %s, gvr: %v, finalizers: %v", namespace, gvr, finalizersToNumRemaining) + logger.V(5).Info("Namespace controller - deleteAllContentForGroupVersionResource - items remaining with finalizers", "namespace", namespace, "resource", gvr, "finalizers", finalizersToNumRemaining) return gvrDeletionMetadata{ finalizerEstimateSeconds: finalizerEstimateSeconds, numRemaining: len(unstructuredList.Items), @@ -493,13 +499,14 @@ type allGVRDeletionMetadata struct { // deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources. // It returns an estimate of the time remaining before the remaining resources are deleted. // If estimate > 0, not all resources are guaranteed to be gone. -func (d *namespacedResourcesDeleter) deleteAllContent(ns *v1.Namespace) (int64, error) { +func (d *namespacedResourcesDeleter) deleteAllContent(ctx context.Context, ns *v1.Namespace) (int64, error) { namespace := ns.Name namespaceDeletedAt := *ns.DeletionTimestamp var errs []error conditionUpdater := namespaceConditionUpdater{} estimate := int64(0) - klog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s", namespace) + logger := klog.FromContext(ctx) + logger.V(4).Info("namespace controller - deleteAllContent", "namespace", namespace) resources, err := d.discoverResourcesFn() if err != nil { @@ -521,7 +528,7 @@ func (d *namespacedResourcesDeleter) deleteAllContent(ns *v1.Namespace) (int64, finalizersToNumRemaining: map[string]int{}, } for gvr := range groupVersionResources { - gvrDeletionMetadata, err := d.deleteAllContentForGroupVersionResource(gvr, namespace, namespaceDeletedAt) + gvrDeletionMetadata, err := d.deleteAllContentForGroupVersionResource(ctx, gvr, namespace, namespaceDeletedAt) if err != nil { // If there is an error, hold on to it but proceed with all the remaining // groupVersionResources. @@ -553,19 +560,20 @@ func (d *namespacedResourcesDeleter) deleteAllContent(ns *v1.Namespace) (int64, } // if len(errs)==0, NewAggregate returns nil. - klog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v, errors: %v", namespace, estimate, utilerrors.NewAggregate(errs)) - return estimate, utilerrors.NewAggregate(errs) + err = utilerrors.NewAggregate(errs) + logger.V(4).Info("namespace controller - deleteAllContent", "namespace", namespace, "estimate", estimate, "err", err) + return estimate, err } // estimateGracefulTermination will estimate the graceful termination required for the specific entity in the namespace -func (d *namespacedResourcesDeleter) estimateGracefulTermination(gvr schema.GroupVersionResource, ns string, namespaceDeletedAt metav1.Time) (int64, error) { +func (d *namespacedResourcesDeleter) estimateGracefulTermination(ctx context.Context, gvr schema.GroupVersionResource, ns string, namespaceDeletedAt metav1.Time) (int64, error) { groupResource := gvr.GroupResource() - klog.V(5).Infof("namespace controller - estimateGracefulTermination - group %s, resource: %s", groupResource.Group, groupResource.Resource) + klog.FromContext(ctx).V(5).Info("Namespace controller - estimateGracefulTermination", "group", groupResource.Group, "resource", groupResource.Resource) estimate := int64(0) var err error switch groupResource { case schema.GroupResource{Group: "", Resource: "pods"}: - estimate, err = d.estimateGracefulTerminationForPods(ns) + estimate, err = d.estimateGracefulTerminationForPods(ctx, ns) } if err != nil { return 0, err @@ -580,8 +588,8 @@ func (d *namespacedResourcesDeleter) estimateGracefulTermination(gvr schema.Grou } // estimateGracefulTerminationForPods determines the graceful termination period for pods in the namespace -func (d *namespacedResourcesDeleter) estimateGracefulTerminationForPods(ns string) (int64, error) { - klog.V(5).Infof("namespace controller - estimateGracefulTerminationForPods - namespace %s", ns) +func (d *namespacedResourcesDeleter) estimateGracefulTerminationForPods(ctx context.Context, ns string) (int64, error) { + klog.FromContext(ctx).V(5).Info("Namespace controller - estimateGracefulTerminationForPods", "namespace", ns) estimate := int64(0) podsGetter := d.podsGetter if podsGetter == nil || reflect.ValueOf(podsGetter).IsNil() { diff --git a/pkg/controller/namespace/deletion/namespaced_resources_deleter_test.go b/pkg/controller/namespace/deletion/namespaced_resources_deleter_test.go index c38e329aa9b..e03dc0e1c6d 100644 --- a/pkg/controller/namespace/deletion/namespaced_resources_deleter_test.go +++ b/pkg/controller/namespace/deletion/namespaced_resources_deleter_test.go @@ -38,6 +38,7 @@ import ( metadatafake "k8s.io/client-go/metadata/fake" restclient "k8s.io/client-go/rest" core "k8s.io/client-go/testing" + "k8s.io/klog/v2/ktesting" api "k8s.io/kubernetes/pkg/apis/core" ) @@ -200,8 +201,9 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio fn := func() ([]*metav1.APIResourceList, error) { return resources, testInput.gvrError } - d := NewNamespacedResourcesDeleter(mockClient.CoreV1().Namespaces(), metadataClient, mockClient.CoreV1(), fn, v1.FinalizerKubernetes) - if err := d.Delete(testInput.testNamespace.Name); !matchErrors(err, testInput.expectErrorOnDelete) { + _, ctx := ktesting.NewTestContext(t) + d := NewNamespacedResourcesDeleter(ctx, mockClient.CoreV1().Namespaces(), metadataClient, mockClient.CoreV1(), fn, v1.FinalizerKubernetes) + if err := d.Delete(ctx, testInput.testNamespace.Name); !matchErrors(err, testInput.expectErrorOnDelete) { t.Errorf("expected error %q when syncing namespace, got %q, %v", testInput.expectErrorOnDelete, err, testInput.expectErrorOnDelete == err) } @@ -297,9 +299,10 @@ func TestSyncNamespaceThatIsActive(t *testing.T) { fn := func() ([]*metav1.APIResourceList, error) { return testResources(), nil } - d := NewNamespacedResourcesDeleter(mockClient.CoreV1().Namespaces(), nil, mockClient.CoreV1(), + _, ctx := ktesting.NewTestContext(t) + d := NewNamespacedResourcesDeleter(ctx, mockClient.CoreV1().Namespaces(), nil, mockClient.CoreV1(), fn, v1.FinalizerKubernetes) - err := d.Delete(testNamespace.Name) + err := d.Delete(ctx, testNamespace.Name) if err != nil { t.Errorf("Unexpected error when synching namespace %v", err) } @@ -429,12 +432,12 @@ func TestDeleteEncounters404(t *testing.T) { APIResources: []metav1.APIResource{{Name: "flakes", Namespaced: true, Kind: "Flake", Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"}}}, }}, nil } - - d := NewNamespacedResourcesDeleter(mockClient.CoreV1().Namespaces(), mockMetadataClient, mockClient.CoreV1(), resourcesFn, v1.FinalizerKubernetes) + _, ctx := ktesting.NewTestContext(t) + d := NewNamespacedResourcesDeleter(ctx, mockClient.CoreV1().Namespaces(), mockMetadataClient, mockClient.CoreV1(), resourcesFn, v1.FinalizerKubernetes) // Delete ns1 and get NotFound errors for the flakes resource mockMetadataClient.ClearActions() - if err := d.Delete(ns1.Name); err != nil { + if err := d.Delete(ctx, ns1.Name); err != nil { t.Fatal(err) } if len(mockMetadataClient.Actions()) != 3 || @@ -449,7 +452,7 @@ func TestDeleteEncounters404(t *testing.T) { // Delete ns2 mockMetadataClient.ClearActions() - if err := d.Delete(ns2.Name); err != nil { + if err := d.Delete(ctx, ns2.Name); err != nil { t.Fatal(err) } if len(mockMetadataClient.Actions()) != 2 || diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index b3003c82a26..c5f80a4aed7 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -17,6 +17,7 @@ limitations under the License. package namespace import ( + "context" "fmt" "time" @@ -63,6 +64,7 @@ type NamespaceController struct { // NewNamespaceController creates a new NamespaceController func NewNamespaceController( + ctx context.Context, kubeClient clientset.Interface, metadataClient metadata.Interface, discoverResourcesFn func() ([]*metav1.APIResourceList, error), @@ -73,7 +75,7 @@ func NewNamespaceController( // create the controller so we can inject the enqueue function namespaceController := &NamespaceController{ queue: workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"), - namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken), + namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(ctx, kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken), } // configure the namespace informer event handlers @@ -132,15 +134,15 @@ func (nm *NamespaceController) enqueueNamespace(obj interface{}) { // Each namespace can be in the queue at most once. // The system ensures that no two workers can process // the same namespace at the same time. -func (nm *NamespaceController) worker() { - workFunc := func() bool { +func (nm *NamespaceController) worker(ctx context.Context) { + workFunc := func(ctx context.Context) bool { key, quit := nm.queue.Get() if quit { return true } defer nm.queue.Done(key) - err := nm.syncNamespaceFromKey(key.(string)) + err := nm.syncNamespaceFromKey(ctx, key.(string)) if err == nil { // no error, forget this entry and return nm.queue.Forget(key) @@ -149,7 +151,7 @@ func (nm *NamespaceController) worker() { if estimate, ok := err.(*deletion.ResourcesRemainingError); ok { t := estimate.Estimate/2 + 1 - klog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t) + klog.FromContext(ctx).V(4).Info("Content remaining in namespace", "namespace", key, "waitSeconds", t) nm.queue.AddAfter(key, time.Duration(t)*time.Second) } else { // rather than wait for a full resync, re-add the namespace to the queue to be processed @@ -158,9 +160,8 @@ func (nm *NamespaceController) worker() { } return false } - for { - quit := workFunc() + quit := workFunc(ctx) if quit { return @@ -169,39 +170,40 @@ func (nm *NamespaceController) worker() { } // syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it -func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) { +func (nm *NamespaceController) syncNamespaceFromKey(ctx context.Context, key string) (err error) { startTime := time.Now() + logger := klog.FromContext(ctx) defer func() { - klog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Since(startTime)) + logger.V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime)) }() namespace, err := nm.lister.Get(key) if errors.IsNotFound(err) { - klog.Infof("Namespace has been deleted %v", key) + logger.Info("Namespace has been deleted", "namespace", key) return nil } if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err)) return err } - return nm.namespacedResourcesDeleter.Delete(namespace.Name) + return nm.namespacedResourcesDeleter.Delete(ctx, namespace.Name) } // Run starts observing the system with the specified number of workers. -func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) { +func (nm *NamespaceController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer nm.queue.ShutDown() + logger := klog.FromContext(ctx) + logger.Info("Starting namespace controller") + defer logger.Info("Shutting down namespace controller") - klog.Infof("Starting namespace controller") - defer klog.Infof("Shutting down namespace controller") - - if !cache.WaitForNamedCacheSync("namespace", stopCh, nm.listerSynced) { + if !cache.WaitForNamedCacheSync("namespace", ctx.Done(), nm.listerSynced) { return } - klog.V(5).Info("Starting workers of namespace controller") + logger.V(5).Info("Starting workers of namespace controller") for i := 0; i < workers; i++ { - go wait.Until(nm.worker, time.Second, stopCh) + go wait.UntilWithContext(ctx, nm.worker, time.Second) } - <-stopCh + <-ctx.Done() } diff --git a/test/e2e_node/services/internal_services.go b/test/e2e_node/services/internal_services.go index a59022a3850..c9d80058b9f 100644 --- a/test/e2e_node/services/internal_services.go +++ b/test/e2e_node/services/internal_services.go @@ -17,11 +17,13 @@ limitations under the License. package services import ( + "context" "os" "testing" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/klog/v2" @@ -55,6 +57,7 @@ func (es *e2eServices) run(t *testing.T) error { // start starts the tests embedded services or returns an error. func (es *e2eServices) start(t *testing.T) error { + _, ctx := ktesting.NewTestContext(t) klog.Info("Starting e2e services...") err := es.startEtcd(t) if err != nil { @@ -64,7 +67,7 @@ func (es *e2eServices) start(t *testing.T) error { if err != nil { return err } - err = es.startNamespaceController() + err = es.startNamespaceController(ctx) if err != nil { return nil } @@ -124,10 +127,10 @@ func (es *e2eServices) startAPIServer(etcdStorage *storagebackend.Config) error } // startNamespaceController starts the embedded namespace controller or returns an error. -func (es *e2eServices) startNamespaceController() error { - klog.Info("Starting namespace controller") +func (es *e2eServices) startNamespaceController(ctx context.Context) error { + klog.FromContext(ctx).Info("Starting namespace controller") es.nsController = NewNamespaceController(framework.TestContext.Host) - return es.nsController.Start() + return es.nsController.Start(ctx) } // getServicesHealthCheckURLs returns the health check urls for the internal services. diff --git a/test/e2e_node/services/namespace_controller.go b/test/e2e_node/services/namespace_controller.go index 61fef770307..dc3ddb16029 100644 --- a/test/e2e_node/services/namespace_controller.go +++ b/test/e2e_node/services/namespace_controller.go @@ -17,6 +17,7 @@ limitations under the License. package services import ( + "context" "time" v1 "k8s.io/api/core/v1" @@ -49,7 +50,7 @@ func NewNamespaceController(host string) *NamespaceController { } // Start starts the namespace controller. -func (n *NamespaceController) Start() error { +func (n *NamespaceController) Start(ctx context.Context) error { config := restclient.AddUserAgent(&restclient.Config{ Host: n.host, BearerToken: framework.TestContext.BearerToken, @@ -72,7 +73,9 @@ func (n *NamespaceController) Start() error { } discoverResourcesFn := client.Discovery().ServerPreferredNamespacedResources informerFactory := informers.NewSharedInformerFactory(client, ncResyncPeriod) + nc := namespacecontroller.NewNamespaceController( + ctx, client, metadataClient, discoverResourcesFn, @@ -80,7 +83,7 @@ func (n *NamespaceController) Start() error { ncResyncPeriod, v1.FinalizerKubernetes, ) informerFactory.Start(n.stopCh) - go nc.Run(ncConcurrency, n.stopCh) + go nc.Run(ctx, ncConcurrency) return nil } diff --git a/test/integration/namespace/ns_conditions_test.go b/test/integration/namespace/ns_conditions_test.go index 049ed7face7..b01c6206d1b 100644 --- a/test/integration/namespace/ns_conditions_test.go +++ b/test/integration/namespace/ns_conditions_test.go @@ -35,6 +35,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/metadata" restclient "k8s.io/client-go/rest" + "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/test/integration/etcd" @@ -55,10 +56,12 @@ func TestNamespaceCondition(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go nsController.Run(5, stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + informers.Start(ctx.Done()) + go nsController.Run(ctx, 5) data := etcd.GetEtcdStorageDataForNamespace(nsName) podJSON, err := jsonToUnstructured(data[corev1.SchemeGroupVersion.WithResource("pods")].Stub, "v1", "Pod") @@ -124,9 +127,10 @@ func TestNamespaceLabels(t *testing.T) { // Even though nscontroller isn't used in this test, its creation is already // spawning some goroutines. So we need to run it to ensure they won't leak. - stopCh := make(chan struct{}) - close(stopCh) - go nsController.Run(5, stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go nsController.Run(ctx, 5) nsName := "test-namespace-labels-generated" // Create a new namespace w/ no name @@ -192,8 +196,9 @@ func namespaceLifecycleSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, * } discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources - + _, ctx := ktesting.NewTestContext(t) controller := namespace.NewNamespaceController( + ctx, clientSet, metadataClient, discoverResourcesFn,