From ac6e47cb1423b05c5b438d1ff24573b3c26c4412 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 20 Mar 2025 17:44:38 +0100 Subject: [PATCH 1/4] DRA taint eviction: improve error handling There was one error path that led to a "controller has shut down" log message. Other errors caused different log entries or are so unlikely (event handler registration failure!) that they weren't checked at all. It's clearer to let Run return an error in all cases and then log the "controller has shut down" error at the call site. This also enables tests to mark themselves as failed, should that ever happen. --- cmd/kube-controller-manager/app/core.go | 6 ++++- .../device_taint_eviction.go | 25 +++++++++++++------ .../device_taint_eviction_test.go | 10 ++++---- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 500903de4d0..54c4a1dd7df 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -254,7 +254,11 @@ func startDeviceTaintEvictionController(ctx context.Context, controllerContext C controllerContext.InformerFactory.Resource().V1beta1().DeviceClasses(), controllerName, ) - go deviceTaintEvictionController.Run(ctx) + go func() { + if err := deviceTaintEvictionController.Run(ctx); err != nil { + klog.FromContext(ctx).Error(err, "Device taint processing leading to Pod eviction failed and is now paused") + } + }() return nil, true, nil } diff --git a/pkg/controller/devicetainteviction/device_taint_eviction.go b/pkg/controller/devicetainteviction/device_taint_eviction.go index a2cc9354a58..31da9712e8e 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction.go @@ -18,6 +18,7 @@ package devicetainteviction import ( "context" + "errors" "fmt" "math" "slices" @@ -319,7 +320,8 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo } // Run starts the controller which will run until the context is done. -func (tc *Controller) Run(ctx context.Context) { +// An error is returned for startup problems. +func (tc *Controller) Run(ctx context.Context) error { defer utilruntime.HandleCrash() logger := klog.FromContext(ctx) logger.Info("Starting", "controller", tc.name) @@ -370,7 +372,7 @@ func (tc *Controller) Run(ctx context.Context) { // mutex serializes event processing. var mutex sync.Mutex - claimHandler, _ := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + claimHandler, err := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { claim, ok := obj.(*resourceapi.ResourceClaim) if !ok { @@ -409,12 +411,15 @@ func (tc *Controller) Run(ctx context.Context) { tc.handleClaimChange(claim, nil) }, }) + if err != nil { + return fmt.Errorf("adding claim event handler:%w", err) + } defer func() { _ = tc.claimInformer.Informer().RemoveEventHandler(claimHandler) }() tc.haveSynced = append(tc.haveSynced, claimHandler.HasSynced) - podHandler, _ := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + podHandler, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { pod, ok := obj.(*v1.Pod) if !ok { @@ -453,6 +458,9 @@ func (tc *Controller) Run(ctx context.Context) { tc.handlePodChange(pod, nil) }, }) + if err != nil { + return fmt.Errorf("adding pod event handler: %w", err) + } defer func() { _ = tc.podInformer.Informer().RemoveEventHandler(podHandler) }() @@ -467,8 +475,7 @@ func (tc *Controller) Run(ctx context.Context) { } sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts) if err != nil { - logger.Info("Failed to initialize ResourceSlice tracker; device taint processing leading to Pod eviction is now paused", "err", err) - return + return fmt.Errorf("initialize ResourceSlice tracker: %w", err) } tc.haveSynced = append(tc.haveSynced, sliceTracker.HasSynced) defer sliceTracker.Stop() @@ -478,11 +485,11 @@ func (tc *Controller) Run(ctx context.Context) { // work which might be done as events get emitted for intermediate // state. if !cache.WaitForNamedCacheSyncWithContext(ctx, tc.haveSynced...) { - return + return errors.New("wait for cache sync timed out") } logger.V(1).Info("Underlying informers have synced") - _, _ = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { slice, ok := obj.(*resourceapi.ResourceSlice) if !ok { @@ -519,12 +526,16 @@ func (tc *Controller) Run(ctx context.Context) { tc.handleSliceChange(slice, nil) }, }) + if err != nil { + return fmt.Errorf("add slice event handler: %w", err) + } // sliceTracker.AddEventHandler blocked while delivering events for all known // ResourceSlices. Therefore our own state is up-to-date once we get here. tc.hasSynced.Store(1) <-ctx.Done() + return nil } func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) { diff --git a/pkg/controller/devicetainteviction/device_taint_eviction_test.go b/pkg/controller/devicetainteviction/device_taint_eviction_test.go index ee1b5439fc5..95c9f626e59 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction_test.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction_test.go @@ -1339,7 +1339,7 @@ func TestEviction(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually the controller should have synced it's informers. @@ -1450,7 +1450,7 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually the pod gets scheduled for eviction. @@ -1543,7 +1543,7 @@ func TestParallelPodDeletion(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually the pod gets deleted, in this test by us. @@ -1622,7 +1622,7 @@ func TestRetry(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually the pod gets deleted. @@ -1694,7 +1694,7 @@ func TestEvictionFailure(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually deletion is attempted a few times. From 5856d3ee6fa672e1a37c1940b6eeb63c330ea451 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 20 Mar 2025 17:59:48 +0100 Subject: [PATCH 2/4] DRA taint eviction: fix waiting in unit test Events get recorded in the apiserver asynchronously, so even if the test knows that the event has been evicted because the pod is deleted, it still has to also check for the event to be recorded. This caused a flake in the "Consistently" check of events. --- .../device_taint_eviction_test.go | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/pkg/controller/devicetainteviction/device_taint_eviction_test.go b/pkg/controller/devicetainteviction/device_taint_eviction_test.go index 95c9f626e59..60f7f2b154d 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction_test.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction_test.go @@ -1343,18 +1343,18 @@ func TestEviction(t *testing.T) { }() // Eventually the controller should have synced it's informers. - require.Eventually(tCtx, func() bool { + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool { return controller.hasSynced.Load() > 0 - }, 30*time.Second, time.Millisecond, "controller synced") + }).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("controller synced")) if tt.afterSync != nil { tt.afterSync(tCtx) } // Eventually the pod gets deleted (= evicted). - assert.Eventually(tCtx, func() bool { + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool { _, err := fakeClientset.CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{}) return apierrors.IsNotFound(err) - }, 30*time.Second, time.Millisecond, "pod evicted") + }).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("pod evicted")) pod := pod.DeepCopy() pod.Status.Conditions = []v1.PodCondition{{ @@ -1369,7 +1369,10 @@ func TestEviction(t *testing.T) { // Shortly after deletion we should also see updated metrics. // This is the last thing the controller does for a pod. + // However, actually creating the event on the server is asynchronous, + // so we also have to wait for that. ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error { + gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent()) return testPodDeletionsMetrics(controller, 1) }).WithTimeout(30*time.Second).Should(gomega.Succeed(), "pod eviction done") @@ -1547,11 +1550,11 @@ func TestParallelPodDeletion(t *testing.T) { }() // Eventually the pod gets deleted, in this test by us. - assert.Eventually(tCtx, func() bool { + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool { mutex.Lock() defer mutex.Unlock() return podGets >= 1 - }, 30*time.Second, time.Millisecond, "pod eviction started") + }).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("pod eviction started")) // We don't want any events. ktesting.Consistently(tCtx, func(tCtx ktesting.TContext) error { @@ -1625,8 +1628,9 @@ func TestRetry(t *testing.T) { assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() - // Eventually the pod gets deleted. + // Eventually the pod gets deleted and the event is recorded. ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error { + gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent()) return testPodDeletionsMetrics(controller, 1) }).WithTimeout(30*time.Second).Should(gomega.Succeed(), "pod eviction done") @@ -1698,11 +1702,11 @@ func TestEvictionFailure(t *testing.T) { }() // Eventually deletion is attempted a few times. - assert.Eventually(tCtx, func() bool { + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) int { mutex.Lock() defer mutex.Unlock() - return podDeletions >= retries - }, 30*time.Second, time.Millisecond, "pod eviction failed") + return podDeletions + }).WithTimeout(30*time.Second).Should(gomega.BeNumerically(">=", retries), "pod eviction failed") // Now we can check the API calls. ktesting.Consistently(tCtx, func(tCtx ktesting.TContext) error { From 56adcd06f37a75696556b7a5209a4ca30cda6700 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 20 Mar 2025 18:15:58 +0100 Subject: [PATCH 3/4] DRA device eviction: fix eviction triggered by pod scheduling Normally the scheduler shouldn't schedule when there is a taint, but perhaps it didn't know yet. The TestEviction/update test covered this, but only failed under the right timing conditions. The new event handler test case covers it reliably. --- .../device_taint_eviction.go | 3 ++- .../device_taint_eviction_test.go | 21 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pkg/controller/devicetainteviction/device_taint_eviction.go b/pkg/controller/devicetainteviction/device_taint_eviction.go index 31da9712e8e..3e2abb194d7 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction.go @@ -794,12 +794,13 @@ func (tc *Controller) handlePodChange(oldPod, newPod *v1.Pod) { // Pods get updated quite frequently. There's no need // to check them again unless something changed regarding - // their claims. + // their claims or they got scheduled. // // In particular this prevents adding the pod again // directly after the eviction condition got added // to it. if oldPod != nil && + oldPod.Spec.NodeName == newPod.Spec.NodeName && apiequality.Semantic.DeepEqual(oldPod.Status.ResourceClaimStatuses, newPod.Status.ResourceClaimStatuses) { return } diff --git a/pkg/controller/devicetainteviction/device_taint_eviction_test.go b/pkg/controller/devicetainteviction/device_taint_eviction_test.go index 60f7f2b154d..5ef0d8c25e6 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction_test.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction_test.go @@ -318,6 +318,10 @@ var ( OwnerReference(podName, podUID+"-other", podKind). UID("other"). Obj() + unscheduledPodWithClaimName = st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}). + Obj() podWithClaimName = st.MakePod().Name(podName).Namespace(namespace). UID(podUID). PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}). @@ -494,6 +498,23 @@ func TestHandlers(t *testing.T) { // At the moment, the code reliably cancels right away. wantEvents: []*v1.Event{cancelPodEviction}, }, + "evict-pod-after-scheduling": { + initialState: state{ + pods: []*v1.Pod{unscheduledPodWithClaimName}, + slices: []*resourceapi.ResourceSlice{sliceTainted, slice2}, + allocatedClaims: []allocatedClaim{{ResourceClaim: inUseClaim, evictionTime: &taintTime}}, + }, + events: []any{ + // Normally the scheduler shouldn't schedule when there is a taint, + // but perhaps it didn't know yet. + update(unscheduledPodWithClaimName, podWithClaimName), + }, + finalState: state{ + slices: []*resourceapi.ResourceSlice{sliceTainted, slice2}, + allocatedClaims: []allocatedClaim{{ResourceClaim: inUseClaim, evictionTime: &taintTime}}, + evicting: []evictAt{{newObject(podWithClaimName), taintTime.Time}}, + }, + }, "evict-pod-resourceclaim-unrelated-changes": { initialState: state{ pods: []*v1.Pod{podWithClaimName}, From cfb9486417d28304049a97cd4464dd8a9069406d Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 20 Mar 2025 18:20:59 +0100 Subject: [PATCH 4/4] DRA taint eviction: avoid nil panic The timed worker queue actually can have nil entries in its map if the work was kicked off immediately. This looks like an unnecessary special case (it would be fine to call AfterFunc with a duration <= 0 and it would do the right thing), but to avoid more sweeping changes the fix consists of documenting this special behavior and adding a nil check. --- pkg/controller/tainteviction/timed_workers.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/controller/tainteviction/timed_workers.go b/pkg/controller/tainteviction/timed_workers.go index 732b81bbc79..3bb2e48d0a7 100644 --- a/pkg/controller/tainteviction/timed_workers.go +++ b/pkg/controller/tainteviction/timed_workers.go @@ -57,6 +57,7 @@ type TimedWorker struct { } // createWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`. +// Returns nil if the work was started immediately and doesn't need a timer. func createWorker(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(ctx context.Context, fireAt time.Time, args *WorkArgs) error, clock clock.WithDelayedExecution) *TimedWorker { delay := fireAt.Sub(createdAt) logger := klog.FromContext(ctx) @@ -90,6 +91,7 @@ func (w *TimedWorker) Cancel() { type TimedWorkerQueue struct { sync.Mutex // map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker. + // Entries may be nil if the work didn't need a timer and is already running. workers map[string]*TimedWorker workFunc func(ctx context.Context, fireAt time.Time, args *WorkArgs) error clock clock.WithDelayedExecution @@ -145,6 +147,10 @@ func (q *TimedWorkerQueue) UpdateWork(ctx context.Context, args *WorkArgs, creat q.Lock() defer q.Unlock() if worker, exists := q.workers[key]; exists { + if worker == nil { + logger.V(4).Info("Keeping existing work, already in progress", "item", key) + return + } if worker.FireAt.Compare(fireAt) == 0 { logger.V(4).Info("Keeping existing work, same time", "item", key, "createTime", worker.CreatedAt, "firedTime", worker.FireAt) return