namespace controller: use contextual logging

This commit is contained in:
JunYang
2022-10-29 22:34:47 +08:00
committed by 杨军10092085
parent a35650b833
commit f5bd8c86d4
7 changed files with 113 additions and 87 deletions

View File

@@ -17,6 +17,7 @@ limitations under the License.
package namespace
import (
"context"
"fmt"
"time"
@@ -63,6 +64,7 @@ type NamespaceController struct {
// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(
ctx context.Context,
kubeClient clientset.Interface,
metadataClient metadata.Interface,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
@@ -73,7 +75,7 @@ func NewNamespaceController(
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
queue: workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(ctx, kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
}
// configure the namespace informer event handlers
@@ -132,15 +134,15 @@ func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
// Each namespace can be in the queue at most once.
// The system ensures that no two workers can process
// the same namespace at the same time.
func (nm *NamespaceController) worker() {
workFunc := func() bool {
func (nm *NamespaceController) worker(ctx context.Context) {
workFunc := func(ctx context.Context) bool {
key, quit := nm.queue.Get()
if quit {
return true
}
defer nm.queue.Done(key)
err := nm.syncNamespaceFromKey(key.(string))
err := nm.syncNamespaceFromKey(ctx, key.(string))
if err == nil {
// no error, forget this entry and return
nm.queue.Forget(key)
@@ -149,7 +151,7 @@ func (nm *NamespaceController) worker() {
if estimate, ok := err.(*deletion.ResourcesRemainingError); ok {
t := estimate.Estimate/2 + 1
klog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
klog.FromContext(ctx).V(4).Info("Content remaining in namespace", "namespace", key, "waitSeconds", t)
nm.queue.AddAfter(key, time.Duration(t)*time.Second)
} else {
// rather than wait for a full resync, re-add the namespace to the queue to be processed
@@ -158,9 +160,8 @@ func (nm *NamespaceController) worker() {
}
return false
}
for {
quit := workFunc()
quit := workFunc(ctx)
if quit {
return
@@ -169,39 +170,40 @@ func (nm *NamespaceController) worker() {
}
// syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it
func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
func (nm *NamespaceController) syncNamespaceFromKey(ctx context.Context, key string) (err error) {
startTime := time.Now()
logger := klog.FromContext(ctx)
defer func() {
klog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Since(startTime))
logger.V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime))
}()
namespace, err := nm.lister.Get(key)
if errors.IsNotFound(err) {
klog.Infof("Namespace has been deleted %v", key)
logger.Info("Namespace has been deleted", "namespace", key)
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err))
return err
}
return nm.namespacedResourcesDeleter.Delete(namespace.Name)
return nm.namespacedResourcesDeleter.Delete(ctx, namespace.Name)
}
// Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
func (nm *NamespaceController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer nm.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting namespace controller")
defer logger.Info("Shutting down namespace controller")
klog.Infof("Starting namespace controller")
defer klog.Infof("Shutting down namespace controller")
if !cache.WaitForNamedCacheSync("namespace", stopCh, nm.listerSynced) {
if !cache.WaitForNamedCacheSync("namespace", ctx.Done(), nm.listerSynced) {
return
}
klog.V(5).Info("Starting workers of namespace controller")
logger.V(5).Info("Starting workers of namespace controller")
for i := 0; i < workers; i++ {
go wait.Until(nm.worker, time.Second, stopCh)
go wait.UntilWithContext(ctx, nm.worker, time.Second)
}
<-stopCh
<-ctx.Done()
}