Merge pull request #113916 from songxiao-wang87/runwxs-test1

Migrate ttl_controller to contextual logging
This commit is contained in:
Kubernetes Prow Robot 2023-03-07 04:18:30 -08:00 committed by GitHub
commit 471b392f43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 22 deletions

View File

@ -490,7 +490,9 @@ func startServiceAccountController(ctx context.Context, controllerContext Contro
} }
func startTTLController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { func startTTLController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "ttl"))
go ttlcontroller.NewTTLController( go ttlcontroller.NewTTLController(
ctx,
controllerContext.InformerFactory.Core().V1().Nodes(), controllerContext.InformerFactory.Core().V1().Nodes(),
controllerContext.ClientBuilder.ClientOrDie("ttl-controller"), controllerContext.ClientBuilder.ClientOrDie("ttl-controller"),
).Run(ctx, 5) ).Run(ctx, 5)

View File

@ -78,15 +78,19 @@ type Controller struct {
} }
// NewTTLController creates a new TTLController // NewTTLController creates a new TTLController
func NewTTLController(nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *Controller { func NewTTLController(ctx context.Context, nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *Controller {
ttlc := &Controller{ ttlc := &Controller{
kubeClient: kubeClient, kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"),
} }
logger := klog.FromContext(ctx)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ttlc.addNode, AddFunc: func(obj interface{}) {
UpdateFunc: ttlc.updateNode, ttlc.addNode(logger, obj)
},
UpdateFunc: func(old, newObj interface{}) {
ttlc.updateNode(logger, old, newObj)
},
DeleteFunc: ttlc.deleteNode, DeleteFunc: ttlc.deleteNode,
}) })
@ -116,9 +120,9 @@ var (
func (ttlc *Controller) Run(ctx context.Context, workers int) { func (ttlc *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer ttlc.queue.ShutDown() defer ttlc.queue.ShutDown()
logger := klog.FromContext(ctx)
klog.Infof("Starting TTL controller") logger.Info("Starting TTL controller")
defer klog.Infof("Shutting down TTL controller") defer logger.Info("Shutting down TTL controller")
if !cache.WaitForNamedCacheSync("TTL", ctx.Done(), ttlc.hasSynced) { if !cache.WaitForNamedCacheSync("TTL", ctx.Done(), ttlc.hasSynced) {
return return
@ -131,7 +135,7 @@ func (ttlc *Controller) Run(ctx context.Context, workers int) {
<-ctx.Done() <-ctx.Done()
} }
func (ttlc *Controller) addNode(obj interface{}) { func (ttlc *Controller) addNode(logger klog.Logger, obj interface{}) {
node, ok := obj.(*v1.Node) node, ok := obj.(*v1.Node)
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
@ -147,10 +151,10 @@ func (ttlc *Controller) addNode(obj interface{}) {
ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
} }
}() }()
ttlc.enqueueNode(node) ttlc.enqueueNode(logger, node)
} }
func (ttlc *Controller) updateNode(_, newObj interface{}) { func (ttlc *Controller) updateNode(logger klog.Logger, _, newObj interface{}) {
node, ok := newObj.(*v1.Node) node, ok := newObj.(*v1.Node)
if !ok { if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
@ -161,7 +165,7 @@ func (ttlc *Controller) updateNode(_, newObj interface{}) {
// We are relying on the fact that Kubelet is updating node status // We are relying on the fact that Kubelet is updating node status
// every 10s (or generally every X seconds), which means that whenever // every 10s (or generally every X seconds), which means that whenever
// required, its ttl annotation should be updated within that period. // required, its ttl annotation should be updated within that period.
ttlc.enqueueNode(node) ttlc.enqueueNode(logger, node)
} }
func (ttlc *Controller) deleteNode(obj interface{}) { func (ttlc *Controller) deleteNode(obj interface{}) {
@ -191,10 +195,10 @@ func (ttlc *Controller) deleteNode(obj interface{}) {
// We are not processing the node, as it no longer exists. // We are not processing the node, as it no longer exists.
} }
func (ttlc *Controller) enqueueNode(node *v1.Node) { func (ttlc *Controller) enqueueNode(logger klog.Logger, node *v1.Node) {
key, err := controller.KeyFunc(node) key, err := controller.KeyFunc(node)
if err != nil { if err != nil {
klog.Errorf("Couldn't get key for object %+v", node) logger.Error(nil, "Couldn't get key for object", "object", klog.KObj(node))
return return
} }
ttlc.queue.Add(key) ttlc.queue.Add(key)
@ -229,7 +233,7 @@ func (ttlc *Controller) getDesiredTTLSeconds() int {
return ttlc.desiredTTLSeconds return ttlc.desiredTTLSeconds
} }
func getIntFromAnnotation(node *v1.Node, annotationKey string) (int, bool) { func getIntFromAnnotation(ctx context.Context, node *v1.Node, annotationKey string) (int, bool) {
if node.Annotations == nil { if node.Annotations == nil {
return 0, false return 0, false
} }
@ -239,8 +243,9 @@ func getIntFromAnnotation(node *v1.Node, annotationKey string) (int, bool) {
} }
intValue, err := strconv.Atoi(annotationValue) intValue, err := strconv.Atoi(annotationValue)
if err != nil { if err != nil {
klog.Warningf("Cannot convert the value %q with annotation key %q for the node %q", logger := klog.FromContext(ctx)
annotationValue, annotationKey, node.Name) logger.Info("Could not convert the value with annotation key for the node", "annotationValue",
annotationValue, "annotationKey", annotationKey, "node", klog.KObj(node))
return 0, false return 0, false
} }
return intValue, true return intValue, true
@ -268,11 +273,12 @@ func (ttlc *Controller) patchNodeWithAnnotation(ctx context.Context, node *v1.No
return err return err
} }
_, err = ttlc.kubeClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) _, err = ttlc.kubeClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
logger := klog.FromContext(ctx)
if err != nil { if err != nil {
klog.V(2).InfoS("Failed to change ttl annotation for node", "node", klog.KObj(node), "err", err) logger.V(2).Info("Failed to change ttl annotation for node", "node", klog.KObj(node), "err", err)
return err return err
} }
klog.V(2).InfoS("Changed ttl annotation", "node", klog.KObj(node), "TTL", time.Duration(value)*time.Second) logger.V(2).Info("Changed ttl annotation", "node", klog.KObj(node), "TTL", time.Duration(value)*time.Second)
return nil return nil
} }
@ -286,7 +292,7 @@ func (ttlc *Controller) updateNodeIfNeeded(ctx context.Context, key string) erro
} }
desiredTTL := ttlc.getDesiredTTLSeconds() desiredTTL := ttlc.getDesiredTTLSeconds()
currentTTL, ok := getIntFromAnnotation(node, v1.ObjectTTLAnnotationKey) currentTTL, ok := getIntFromAnnotation(ctx, node, v1.ObjectTTLAnnotationKey)
if ok && currentTTL == desiredTTL { if ok && currentTTL == desiredTTL {
return nil return nil
} }

View File

@ -27,6 +27,7 @@ import (
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2/ktesting"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -235,7 +236,8 @@ func TestDesiredTTL(t *testing.T) {
boundaryStep: testCase.boundaryStep, boundaryStep: testCase.boundaryStep,
} }
if testCase.addNode { if testCase.addNode {
ttlController.addNode(&v1.Node{}) logger, _ := ktesting.NewTestContext(t)
ttlController.addNode(logger, &v1.Node{})
} }
if testCase.deleteNode { if testCase.deleteNode {
ttlController.deleteNode(&v1.Node{}) ttlController.deleteNode(&v1.Node{})

View File

@ -32,6 +32,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1" listers "k8s.io/client-go/listers/core/v1"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/ttl" "k8s.io/kubernetes/pkg/controller/ttl"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
@ -137,9 +138,10 @@ func TestTTLAnnotations(t *testing.T) {
testClient, informers := createClientAndInformers(t, server) testClient, informers := createClientAndInformers(t, server)
nodeInformer := informers.Core().V1().Nodes() nodeInformer := informers.Core().V1().Nodes()
ttlc := ttl.NewTTLController(nodeInformer, testClient) _, ctx := ktesting.NewTestContext(t)
ttlc := ttl.NewTTLController(ctx, nodeInformer, testClient)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
go nodeInformer.Informer().Run(ctx.Done()) go nodeInformer.Informer().Run(ctx.Done())
go ttlc.Run(ctx, 1) go ttlc.Run(ctx, 1)