pkg/controller/util support contextual logging (#115049)

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
This commit is contained in:
Ziqi Zhao 2023-03-15 03:38:14 +08:00 committed by GitHub
parent 1cb334960c
commit d1aa73312c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 77 deletions

View File

@ -525,7 +525,7 @@ func (nc *Controller) doNodeProcessingPassWorker(ctx context.Context) {
}
// TODO: re-evaluate whether there are any labels that need to be
// reconcile in 1.19. Remove this function if it's no longer necessary.
if err := nc.reconcileNodeLabels(nodeName); err != nil {
if err := nc.reconcileNodeLabels(ctx, nodeName); err != nil {
logger.Error(err, "Failed to reconcile labels for node, requeue it", "node", klog.KRef("", nodeName))
// TODO(yujuhong): Add nodeName back to the queue
}
@ -675,7 +675,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
}
for i := range added {
logger.V(1).Info("Controller observed a new Node", "node", klog.KRef("", added[i].Name))
controllerutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
controllerutil.RecordNodeEvent(ctx, nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(logger, added[i])
nc.markNodeAsReachable(ctx, added[i])
@ -683,7 +683,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
for i := range deleted {
logger.V(1).Info("Controller observed a Node deletion", "node", klog.KRef("", deleted[i].Name))
controllerutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
controllerutil.RecordNodeEvent(ctx, nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
delete(nc.knownNodeSet, deleted[i].Name)
}
@ -1287,7 +1287,7 @@ func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition)
}
// reconcileNodeLabels reconciles node labels.
func (nc *Controller) reconcileNodeLabels(nodeName string) error {
func (nc *Controller) reconcileNodeLabels(ctx context.Context, nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
@ -1327,7 +1327,7 @@ func (nc *Controller) reconcileNodeLabels(nodeName string) error {
if len(labelsToUpdate) == 0 {
return nil
}
if !controllerutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) {
if !controllerutil.AddOrUpdateLabelsOnNode(ctx, nc.kubeClient, labelsToUpdate, node) {
return fmt.Errorf("failed update labels for node %+v", node)
}
return nil

View File

@ -652,9 +652,10 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
},
}
_, ctx := ktesting.NewTestContext(t)
for _, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
item.fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -673,7 +674,7 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.timeToPass > 0 {
@ -688,7 +689,7 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
zones := testutil.GetZones(item.fakeNodeHandler)
@ -702,7 +703,7 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
t.Logf("listed pods %d for node %v", len(pods), value.Value)
controllerutil.DeletePods(context.TODO(), item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
controllerutil.DeletePods(ctx, item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
return true, 0
})
} else {
@ -820,9 +821,10 @@ func TestPodStatusChange(t *testing.T) {
},
}
_, ctx := ktesting.NewTestContext(t)
for _, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
item.fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -838,7 +840,7 @@ func TestPodStatusChange(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.timeToPass > 0 {
@ -849,7 +851,7 @@ func TestPodStatusChange(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
zones := testutil.GetZones(item.fakeNodeHandler)
@ -861,7 +863,7 @@ func TestPodStatusChange(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
controllerutil.DeletePods(context.TODO(), item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore)
controllerutil.DeletePods(ctx, item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore)
return true, 0
})
}
@ -1102,9 +1104,10 @@ func TestMonitorNodeHealthUpdateStatus(t *testing.T) {
expectedPodStatusUpdate: false,
},
}
_, ctx := ktesting.NewTestContext(t)
for i, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
item.fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -1120,7 +1123,7 @@ func TestMonitorNodeHealthUpdateStatus(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.timeToPass > 0 {
@ -1129,7 +1132,7 @@ func TestMonitorNodeHealthUpdateStatus(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
@ -1646,8 +1649,9 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
for _, item := range testcases {
t.Run(item.description, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
item.fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -1666,7 +1670,7 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
if err := nodeController.syncLeaseStore(item.lease); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if item.timeToPass > 0 {
@ -1678,7 +1682,7 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
if err := nodeController.syncLeaseStore(item.newLease); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}
@ -1809,9 +1813,10 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) {
},
}
_, ctx := ktesting.NewTestContext(t)
for i, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
item.fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -1827,7 +1832,7 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
}
if item.timeToPass > 0 {
@ -1836,7 +1841,7 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
}
}
@ -1903,6 +1908,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize(t *testing.T) {
{workers: 1},
}
_, ctx := ktesting.NewTestContext(t)
for i, item := range table {
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
@ -1912,7 +1918,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize(t *testing.T) {
}
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -1930,7 +1936,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize(t *testing.T) {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
}
@ -1956,7 +1962,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyWithWorkerSize(t *testing.T) {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
}
@ -2113,8 +2119,9 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
for _, item := range table {
t.Run(item.desc, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
item.fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -2136,7 +2143,7 @@ func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
@ -2250,8 +2257,9 @@ func TestApplyNoExecuteTaints(t *testing.T) {
},
}
originalTaint := UnreachableTaintTemplate
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -2267,11 +2275,11 @@ func TestApplyNoExecuteTaints(t *testing.T) {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doNoExecuteTaintingPass(context.TODO())
node0, err := fakeNodeHandler.Get(context.TODO(), "node0", metav1.GetOptions{})
nodeController.doNoExecuteTaintingPass(ctx)
node0, err := fakeNodeHandler.Get(ctx, "node0", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node0...")
return
@ -2279,7 +2287,7 @@ func TestApplyNoExecuteTaints(t *testing.T) {
if !taintutils.TaintExists(node0.Spec.Taints, UnreachableTaintTemplate) {
t.Errorf("Can't find taint %v in %v", originalTaint, node0.Spec.Taints)
}
node2, err := fakeNodeHandler.Get(context.TODO(), "node2", metav1.GetOptions{})
node2, err := fakeNodeHandler.Get(ctx, "node2", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node2...")
return
@ -2290,7 +2298,7 @@ func TestApplyNoExecuteTaints(t *testing.T) {
// Make node3 healthy again.
node2.Status = healthyNodeNewStatus
_, err = fakeNodeHandler.UpdateStatus(context.TODO(), node2, metav1.UpdateOptions{})
_, err = fakeNodeHandler.UpdateStatus(ctx, node2, metav1.UpdateOptions{})
if err != nil {
t.Errorf(err.Error())
return
@ -2298,12 +2306,12 @@ func TestApplyNoExecuteTaints(t *testing.T) {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doNoExecuteTaintingPass(context.TODO())
nodeController.doNoExecuteTaintingPass(ctx)
node2, err = fakeNodeHandler.Get(context.TODO(), "node2", metav1.GetOptions{})
node2, err = fakeNodeHandler.Get(ctx, "node2", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node2...")
return
@ -2403,8 +2411,9 @@ func TestApplyNoExecuteTaintsToNodesEnqueueTwice(t *testing.T) {
},
},
}
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -2421,21 +2430,21 @@ func TestApplyNoExecuteTaintsToNodesEnqueueTwice(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
// 1. monitor node health twice, add untainted node once
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
// 2. mark node0 healthy
node0, err := fakeNodeHandler.Get(context.TODO(), "node0", metav1.GetOptions{})
node0, err := fakeNodeHandler.Get(ctx, "node0", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node0...")
return
}
node0.Status = healthyNodeNewStatus
_, err = fakeNodeHandler.UpdateStatus(context.TODO(), node0, metav1.UpdateOptions{})
_, err = fakeNodeHandler.UpdateStatus(ctx, node0, metav1.UpdateOptions{})
if err != nil {
t.Errorf(err.Error())
return
@ -2518,17 +2527,17 @@ func TestApplyNoExecuteTaintsToNodesEnqueueTwice(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
// 3. start monitor node health again, add untainted node twice, construct UniqueQueue with duplicated node cache
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
// 4. do NoExecute taint pass
// when processing with node0, condition.Status is NodeReady, and return true with default case
// then remove the set value and queue value both, the taint job never stuck
nodeController.doNoExecuteTaintingPass(context.TODO())
nodeController.doNoExecuteTaintingPass(ctx)
// 5. get node3 and node5, see if it has ready got NoExecute taint
node3, err := fakeNodeHandler.Get(context.TODO(), "node3", metav1.GetOptions{})
node3, err := fakeNodeHandler.Get(ctx, "node3", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node3...")
return
@ -2536,7 +2545,7 @@ func TestApplyNoExecuteTaintsToNodesEnqueueTwice(t *testing.T) {
if !taintutils.TaintExists(node3.Spec.Taints, UnreachableTaintTemplate) || len(node3.Spec.Taints) == 0 {
t.Errorf("Not found taint %v in %v, which should be present in %s", UnreachableTaintTemplate, node3.Spec.Taints, node3.Name)
}
node5, err := fakeNodeHandler.Get(context.TODO(), "node5", metav1.GetOptions{})
node5, err := fakeNodeHandler.Get(ctx, "node5", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node5...")
return
@ -2625,8 +2634,9 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
originalTaint := UnreachableTaintTemplate
updatedTaint := NotReadyTaintTemplate
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -2642,17 +2652,17 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doNoExecuteTaintingPass(context.TODO())
nodeController.doNoExecuteTaintingPass(ctx)
node0, err := fakeNodeHandler.Get(context.TODO(), "node0", metav1.GetOptions{})
node0, err := fakeNodeHandler.Get(ctx, "node0", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node0...")
return
}
node1, err := fakeNodeHandler.Get(context.TODO(), "node1", metav1.GetOptions{})
node1, err := fakeNodeHandler.Get(ctx, "node1", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node1...")
return
@ -2666,12 +2676,12 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
node0.Status = newNodeStatus
node1.Status = healthyNodeNewStatus
_, err = fakeNodeHandler.UpdateStatus(context.TODO(), node0, metav1.UpdateOptions{})
_, err = fakeNodeHandler.UpdateStatus(ctx, node0, metav1.UpdateOptions{})
if err != nil {
t.Errorf(err.Error())
return
}
_, err = fakeNodeHandler.UpdateStatus(context.TODO(), node1, metav1.UpdateOptions{})
_, err = fakeNodeHandler.UpdateStatus(ctx, node1, metav1.UpdateOptions{})
if err != nil {
t.Errorf(err.Error())
return
@ -2680,12 +2690,12 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doNoExecuteTaintingPass(context.TODO())
nodeController.doNoExecuteTaintingPass(ctx)
node0, err = fakeNodeHandler.Get(context.TODO(), "node0", metav1.GetOptions{})
node0, err = fakeNodeHandler.Get(ctx, "node0", metav1.GetOptions{})
if err != nil {
t.Errorf("Can't get current node0...")
return
@ -2728,8 +2738,9 @@ func TestTaintsNodeByCondition(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -2880,11 +2891,11 @@ func TestTaintsNodeByCondition(t *testing.T) {
}
for _, test := range tests {
fakeNodeHandler.Update(context.TODO(), test.Node, metav1.UpdateOptions{})
fakeNodeHandler.Update(ctx, test.Node, metav1.UpdateOptions{})
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.doNoScheduleTaintingPass(context.TODO(), test.Node.Name)
nodeController.doNoScheduleTaintingPass(ctx, test.Node.Name)
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2930,8 +2941,9 @@ func TestNodeEventGeneration(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -2949,7 +2961,7 @@ func TestNodeEventGeneration(t *testing.T) {
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(context.TODO()); err != nil {
if err := nodeController.monitorNodeHealth(ctx); err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeRecorder.Events) != 1 {
@ -3002,8 +3014,9 @@ func TestReconcileNodeLabels(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -3090,11 +3103,11 @@ func TestReconcileNodeLabels(t *testing.T) {
}
for _, test := range tests {
fakeNodeHandler.Update(context.TODO(), test.Node, metav1.UpdateOptions{})
fakeNodeHandler.Update(ctx, test.Node, metav1.UpdateOptions{})
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Fatalf("unexpected error: %v", err)
}
nodeController.reconcileNodeLabels(test.Node.Name)
nodeController.reconcileNodeLabels(ctx, test.Node.Name)
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -3144,8 +3157,9 @@ func TestTryUpdateNodeHealth(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
_, ctx := ktesting.NewTestContext(t)
nodeController, _ := newNodeLifecycleControllerFromClient(
context.TODO(),
ctx,
fakeNodeHandler,
testRateLimiterQPS,
testRateLimiterQPS,
@ -3317,7 +3331,7 @@ func TestTryUpdateNodeHealth(t *testing.T) {
probeTimestamp: test.node.CreationTimestamp,
readyTransitionTimestamp: test.node.CreationTimestamp,
})
_, _, currentReadyCondition, err := nodeController.tryUpdateNodeHealth(context.TODO(), test.node)
_, _, currentReadyCondition, err := nodeController.tryUpdateNodeHealth(ctx, test.node)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@ -19,6 +19,7 @@ package node
import (
"context"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -27,7 +28,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
appsv1listers "k8s.io/client-go/listers/apps/v1"
utilpod "k8s.io/kubernetes/pkg/api/v1/pod"
@ -44,9 +45,10 @@ import (
func DeletePods(ctx context.Context, kubeClient clientset.Interface, pods []*v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) {
remaining := false
var updateErrList []error
logger := klog.FromContext(ctx)
if len(pods) > 0 {
RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
RecordNodeEvent(ctx, recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}
for i := range pods {
@ -76,7 +78,7 @@ func DeletePods(ctx context.Context, kubeClient clientset.Interface, pods []*v1.
continue
}
klog.V(2).InfoS("Starting deletion of pod", "pod", klog.KObj(pod))
logger.V(2).Info("Starting deletion of pod", "pod", klog.KObj(pod))
recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
@ -117,7 +119,8 @@ func SetPodTerminationReason(ctx context.Context, kubeClient clientset.Interface
// MarkPodsNotReady updates ready status of given pods running on
// given node from master return true if success
func MarkPodsNotReady(ctx context.Context, kubeClient clientset.Interface, recorder record.EventRecorder, pods []*v1.Pod, nodeName string) error {
klog.V(2).InfoS("Update ready status of pods on node", "node", klog.KRef("", nodeName))
logger := klog.FromContext(ctx)
logger.V(2).Info("Update ready status of pods on node", "node", klog.KRef("", nodeName))
errs := []error{}
for i := range pods {
@ -138,14 +141,14 @@ func MarkPodsNotReady(ctx context.Context, kubeClient clientset.Interface, recor
break
}
klog.V(2).InfoS("Updating ready status of pod to false", "pod", pod.Name)
logger.V(2).Info("Updating ready status of pod to false", "pod", klog.KObj(pod))
if _, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
if apierrors.IsNotFound(err) {
// NotFound error means that pod was already deleted.
// There is nothing left to do with this pod.
continue
}
klog.InfoS("Failed to update status for pod", "pod", klog.KObj(pod), "err", err)
logger.Info("Failed to update status for pod", "pod", klog.KObj(pod), "err", err)
errs = append(errs, err)
}
// record NodeNotReady event after updateStatus to make sure pod still exists
@ -158,7 +161,8 @@ func MarkPodsNotReady(ctx context.Context, kubeClient clientset.Interface, recor
}
// RecordNodeEvent records a event related to a node.
func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
logger := klog.FromContext(ctx)
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Node",
@ -166,7 +170,7 @@ func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype
UID: types.UID(nodeUID),
Namespace: "",
}
klog.V(2).InfoS("Recording event message for node", "event", event, "node", klog.KRef("", nodeName))
logger.V(2).Info("Recording event message for node", "event", event, "node", klog.KRef("", nodeName))
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}
@ -188,6 +192,7 @@ func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newSta
// SwapNodeControllerTaint returns true in case of success and false
// otherwise.
func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool {
logger := klog.FromContext(ctx)
for _, taintToAdd := range taintsToAdd {
now := metav1.Now()
taintToAdd.TimeAdded = &now
@ -203,7 +208,7 @@ func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface
err))
return false
}
klog.V(4).InfoS("Added taint to node", "taint", taintsToAdd, "node", node.Name)
logger.V(4).Info("Added taint to node", "taint", taintsToAdd, "node", klog.KRef("", node.Name))
err = controller.RemoveTaintOffNode(ctx, kubeClient, node.Name, node, taintsToRemove...)
if err != nil {
@ -215,14 +220,15 @@ func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface
err))
return false
}
klog.V(4).InfoS("Made sure that node has no taint", "node", node.Name, "taint", taintsToRemove)
logger.V(4).Info("Made sure that node has no taint", "node", klog.KRef("", node.Name), "taint", taintsToRemove)
return true
}
// AddOrUpdateLabelsOnNode updates the labels on the node and returns true on
// success and false on failure.
func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
logger := klog.FromContext(ctx)
if err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate); err != nil {
utilruntime.HandleError(
fmt.Errorf(
@ -232,7 +238,7 @@ func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[
err))
return false
}
klog.V(4).InfoS("Updated labels to node", "label", labelsToUpdate, "node", node.Name)
logger.V(4).Info("Updated labels to node", "label", labelsToUpdate, "node", klog.KRef("", node.Name))
return true
}