diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index 0fcbfcb0a3c..3781226b82c 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -85,6 +85,7 @@ type nodeLifecycleController struct { leaseInformer coordinformers.LeaseInformer nodeInformer coreinformers.NodeInformer daemonSetInformer appsinformers.DaemonSetInformer + podInformer coreinformers.PodInformer } func createNodeLease(nodeName string, renewTime metav1.MicroTime) *coordv1.Lease { @@ -121,6 +122,15 @@ func (nc *nodeLifecycleController) syncNodeStore(fakeNodeHandler *testutil.FakeN return nc.nodeInformer.Informer().GetStore().Replace(newElems, "newRV") } +func (nc *nodeLifecycleController) syncPodStore(pod *v1.Pod) error { + if pod == nil { + return nil + } + newElems := make([]interface{}, 0, 1) + newElems = append(newElems, pod) + return nc.podInformer.Informer().GetStore().Replace(newElems, "newRV") +} + func newNodeLifecycleControllerFromClient( ctx context.Context, kubeClient clientset.Interface, @@ -138,11 +148,12 @@ func newNodeLifecycleControllerFromClient( leaseInformer := factory.Coordination().V1().Leases() nodeInformer := factory.Core().V1().Nodes() daemonSetInformer := factory.Apps().V1().DaemonSets() + podInformer := factory.Core().V1().Pods() nc, err := NewNodeLifecycleController( ctx, leaseInformer, - factory.Core().V1().Pods(), + podInformer, nodeInformer, daemonSetInformer, kubeClient, @@ -163,7 +174,7 @@ func newNodeLifecycleControllerFromClient( nc.nodeInformerSynced = alwaysReady nc.daemonSetInformerSynced = alwaysReady - return &nodeLifecycleController{nc, leaseInformer, nodeInformer, daemonSetInformer}, nil + return &nodeLifecycleController{nc, leaseInformer, nodeInformer, daemonSetInformer, podInformer}, nil } func TestMonitorNodeHealth(t *testing.T) { @@ -3557,3 +3568,141 @@ func Test_isNodeExcludedFromDisruptionChecks(t *testing.T) { }) } } + +func TestProcessPodMarkPodNotReady(t *testing.T) { + fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) + + table := []struct { + desc string + fakeNodeHandler *testutil.FakeNodeHandler + pod *v1.Pod + expectedPodStatusUpdate bool + monitorNodeHealth bool + }{ + { + desc: "Do not mark pod as NotReady when the scheduled node's healthy is not gathered yet", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: fakeNow, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionFalse, + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), + }, + pod: testutil.NewPod("pod0", "node0"), + monitorNodeHealth: false, + expectedPodStatusUpdate: false, + }, + { + desc: "Do not mark pod as NotReady when the scheduled node is ready", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: fakeNow, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), + }, + pod: testutil.NewPod("pod0", "node0"), + monitorNodeHealth: true, + expectedPodStatusUpdate: false, + }, + { + desc: "Pod marked as NotReady when the scheduled node is not ready", + fakeNodeHandler: &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: fakeNow, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionFalse, + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), + }, + pod: testutil.NewPod("pod0", "node0"), + monitorNodeHealth: true, + expectedPodStatusUpdate: true, + }, + } + + _, ctx := ktesting.NewTestContext(t) + for _, item := range table { + t.Run(item.desc, func(t *testing.T) { + nodeController, _ := newNodeLifecycleControllerFromClient( + ctx, + item.fakeNodeHandler, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + ) + nodeController.now = func() metav1.Time { return fakeNow } + nodeController.recorder = testutil.NewFakeRecorder() + nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(item.fakeNodeHandler.Clientset) + if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.monitorNodeHealth { + if err := nodeController.monitorNodeHealth(ctx); err != nil { + t.Errorf("unexpected error: %v", err) + } + } + + if err := nodeController.syncPodStore(item.pod); err != nil { + t.Errorf("unexpected error: %v", err) + } + nodeController.podUpdated(nil, item.pod) + nodeController.processPod(ctx, podUpdateItem{name: item.pod.Name, namespace: item.pod.Namespace}) + + podStatusUpdated := false + for _, action := range item.fakeNodeHandler.Actions() { + if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" { + podStatusUpdated = true + } + } + if podStatusUpdated != item.expectedPodStatusUpdate { + t.Errorf("expect pod status updated to be %v, but got %v", item.expectedPodStatusUpdate, podStatusUpdated) + } + }) + } +} diff --git a/pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go b/pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go index b5565f361a8..f97c7198ce5 100644 --- a/pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go +++ b/pkg/controller/nodelifecycle/scheduler/rate_limited_queue_test.go @@ -39,6 +39,56 @@ func CheckSetEq(lhs, rhs sets.String) bool { return lhs.IsSuperset(rhs) && rhs.IsSuperset(lhs) } +func TestUniqueQueueGet(t *testing.T) { + var tick int64 + now = func() time.Time { + t := time.Unix(tick, 0) + tick++ + return t + } + + queue := UniqueQueue{ + queue: TimedQueue{}, + set: sets.NewString(), + } + queue.Add(TimedValue{Value: "first", UID: "11111", AddedAt: now(), ProcessAt: now()}) + queue.Add(TimedValue{Value: "second", UID: "22222", AddedAt: now(), ProcessAt: now()}) + queue.Add(TimedValue{Value: "third", UID: "33333", AddedAt: now(), ProcessAt: now()}) + + queuePattern := []string{"first", "second", "third"} + if len(queue.queue) != len(queuePattern) { + t.Fatalf("Queue %v should have length %d", queue.queue, len(queuePattern)) + } + if !CheckQueueEq(queuePattern, queue.queue) { + t.Errorf("Invalid queue. Got %v, expected %v", queue.queue, queuePattern) + } + + setPattern := sets.NewString("first", "second", "third") + if len(queue.set) != len(setPattern) { + t.Fatalf("Map %v should have length %d", queue.set, len(setPattern)) + } + if !CheckSetEq(setPattern, queue.set) { + t.Errorf("Invalid map. Got %v, expected %v", queue.set, setPattern) + } + + queue.Get() + queuePattern = []string{"second", "third"} + if len(queue.queue) != len(queuePattern) { + t.Fatalf("Queue %v should have length %d", queue.queue, len(queuePattern)) + } + if !CheckQueueEq(queuePattern, queue.queue) { + t.Errorf("Invalid queue. Got %v, expected %v", queue.queue, queuePattern) + } + + setPattern = sets.NewString("second", "third") + if len(queue.set) != len(setPattern) { + t.Fatalf("Map %v should have length %d", queue.set, len(setPattern)) + } + if !CheckSetEq(setPattern, queue.set) { + t.Errorf("Invalid map. Got %v, expected %v", queue.set, setPattern) + } +} + func TestAddNode(t *testing.T) { evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) evictor.Add("first", "11111") @@ -306,6 +356,12 @@ func TestSwapLimiter(t *testing.T) { if qps != createdQPS { t.Fatalf("QPS does not match create one: %v instead of %v", qps, createdQPS) } + + prev := evictor.limiter + evictor.SwapLimiter(createdQPS) + if prev != evictor.limiter { + t.Fatalf("Limiter should not be swapped if the QPS is the same.") + } } func TestAddAfterTry(t *testing.T) {