diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 500903de4d0..54c4a1dd7df 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -254,7 +254,11 @@ func startDeviceTaintEvictionController(ctx context.Context, controllerContext C controllerContext.InformerFactory.Resource().V1beta1().DeviceClasses(), controllerName, ) - go deviceTaintEvictionController.Run(ctx) + go func() { + if err := deviceTaintEvictionController.Run(ctx); err != nil { + klog.FromContext(ctx).Error(err, "Device taint processing leading to Pod eviction failed and is now paused") + } + }() return nil, true, nil } diff --git a/pkg/controller/devicetainteviction/device_taint_eviction.go b/pkg/controller/devicetainteviction/device_taint_eviction.go index a2cc9354a58..3e2abb194d7 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction.go @@ -18,6 +18,7 @@ package devicetainteviction import ( "context" + "errors" "fmt" "math" "slices" @@ -319,7 +320,8 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo } // Run starts the controller which will run until the context is done. -func (tc *Controller) Run(ctx context.Context) { +// An error is returned for startup problems. +func (tc *Controller) Run(ctx context.Context) error { defer utilruntime.HandleCrash() logger := klog.FromContext(ctx) logger.Info("Starting", "controller", tc.name) @@ -370,7 +372,7 @@ func (tc *Controller) Run(ctx context.Context) { // mutex serializes event processing. var mutex sync.Mutex - claimHandler, _ := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + claimHandler, err := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { claim, ok := obj.(*resourceapi.ResourceClaim) if !ok { @@ -409,12 +411,15 @@ func (tc *Controller) Run(ctx context.Context) { tc.handleClaimChange(claim, nil) }, }) + if err != nil { + return fmt.Errorf("adding claim event handler:%w", err) + } defer func() { _ = tc.claimInformer.Informer().RemoveEventHandler(claimHandler) }() tc.haveSynced = append(tc.haveSynced, claimHandler.HasSynced) - podHandler, _ := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + podHandler, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { pod, ok := obj.(*v1.Pod) if !ok { @@ -453,6 +458,9 @@ func (tc *Controller) Run(ctx context.Context) { tc.handlePodChange(pod, nil) }, }) + if err != nil { + return fmt.Errorf("adding pod event handler: %w", err) + } defer func() { _ = tc.podInformer.Informer().RemoveEventHandler(podHandler) }() @@ -467,8 +475,7 @@ func (tc *Controller) Run(ctx context.Context) { } sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts) if err != nil { - logger.Info("Failed to initialize ResourceSlice tracker; device taint processing leading to Pod eviction is now paused", "err", err) - return + return fmt.Errorf("initialize ResourceSlice tracker: %w", err) } tc.haveSynced = append(tc.haveSynced, sliceTracker.HasSynced) defer sliceTracker.Stop() @@ -478,11 +485,11 @@ func (tc *Controller) Run(ctx context.Context) { // work which might be done as events get emitted for intermediate // state. if !cache.WaitForNamedCacheSyncWithContext(ctx, tc.haveSynced...) { - return + return errors.New("wait for cache sync timed out") } logger.V(1).Info("Underlying informers have synced") - _, _ = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { slice, ok := obj.(*resourceapi.ResourceSlice) if !ok { @@ -519,12 +526,16 @@ func (tc *Controller) Run(ctx context.Context) { tc.handleSliceChange(slice, nil) }, }) + if err != nil { + return fmt.Errorf("add slice event handler: %w", err) + } // sliceTracker.AddEventHandler blocked while delivering events for all known // ResourceSlices. Therefore our own state is up-to-date once we get here. tc.hasSynced.Store(1) <-ctx.Done() + return nil } func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) { @@ -783,12 +794,13 @@ func (tc *Controller) handlePodChange(oldPod, newPod *v1.Pod) { // Pods get updated quite frequently. There's no need // to check them again unless something changed regarding - // their claims. + // their claims or they got scheduled. // // In particular this prevents adding the pod again // directly after the eviction condition got added // to it. if oldPod != nil && + oldPod.Spec.NodeName == newPod.Spec.NodeName && apiequality.Semantic.DeepEqual(oldPod.Status.ResourceClaimStatuses, newPod.Status.ResourceClaimStatuses) { return } diff --git a/pkg/controller/devicetainteviction/device_taint_eviction_test.go b/pkg/controller/devicetainteviction/device_taint_eviction_test.go index ee1b5439fc5..5ef0d8c25e6 100644 --- a/pkg/controller/devicetainteviction/device_taint_eviction_test.go +++ b/pkg/controller/devicetainteviction/device_taint_eviction_test.go @@ -318,6 +318,10 @@ var ( OwnerReference(podName, podUID+"-other", podKind). UID("other"). Obj() + unscheduledPodWithClaimName = st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}). + Obj() podWithClaimName = st.MakePod().Name(podName).Namespace(namespace). UID(podUID). PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}). @@ -494,6 +498,23 @@ func TestHandlers(t *testing.T) { // At the moment, the code reliably cancels right away. wantEvents: []*v1.Event{cancelPodEviction}, }, + "evict-pod-after-scheduling": { + initialState: state{ + pods: []*v1.Pod{unscheduledPodWithClaimName}, + slices: []*resourceapi.ResourceSlice{sliceTainted, slice2}, + allocatedClaims: []allocatedClaim{{ResourceClaim: inUseClaim, evictionTime: &taintTime}}, + }, + events: []any{ + // Normally the scheduler shouldn't schedule when there is a taint, + // but perhaps it didn't know yet. + update(unscheduledPodWithClaimName, podWithClaimName), + }, + finalState: state{ + slices: []*resourceapi.ResourceSlice{sliceTainted, slice2}, + allocatedClaims: []allocatedClaim{{ResourceClaim: inUseClaim, evictionTime: &taintTime}}, + evicting: []evictAt{{newObject(podWithClaimName), taintTime.Time}}, + }, + }, "evict-pod-resourceclaim-unrelated-changes": { initialState: state{ pods: []*v1.Pod{podWithClaimName}, @@ -1339,22 +1360,22 @@ func TestEviction(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually the controller should have synced it's informers. - require.Eventually(tCtx, func() bool { + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool { return controller.hasSynced.Load() > 0 - }, 30*time.Second, time.Millisecond, "controller synced") + }).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("controller synced")) if tt.afterSync != nil { tt.afterSync(tCtx) } // Eventually the pod gets deleted (= evicted). - assert.Eventually(tCtx, func() bool { + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool { _, err := fakeClientset.CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{}) return apierrors.IsNotFound(err) - }, 30*time.Second, time.Millisecond, "pod evicted") + }).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("pod evicted")) pod := pod.DeepCopy() pod.Status.Conditions = []v1.PodCondition{{ @@ -1369,7 +1390,10 @@ func TestEviction(t *testing.T) { // Shortly after deletion we should also see updated metrics. // This is the last thing the controller does for a pod. + // However, actually creating the event on the server is asynchronous, + // so we also have to wait for that. ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error { + gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent()) return testPodDeletionsMetrics(controller, 1) }).WithTimeout(30*time.Second).Should(gomega.Succeed(), "pod eviction done") @@ -1450,7 +1474,7 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually the pod gets scheduled for eviction. @@ -1543,15 +1567,15 @@ func TestParallelPodDeletion(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually the pod gets deleted, in this test by us. - assert.Eventually(tCtx, func() bool { + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool { mutex.Lock() defer mutex.Unlock() return podGets >= 1 - }, 30*time.Second, time.Millisecond, "pod eviction started") + }).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("pod eviction started")) // We don't want any events. ktesting.Consistently(tCtx, func(tCtx ktesting.TContext) error { @@ -1622,11 +1646,12 @@ func TestRetry(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() - // Eventually the pod gets deleted. + // Eventually the pod gets deleted and the event is recorded. ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error { + gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent()) return testPodDeletionsMetrics(controller, 1) }).WithTimeout(30*time.Second).Should(gomega.Succeed(), "pod eviction done") @@ -1694,15 +1719,15 @@ func TestEvictionFailure(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.Run(tCtx) + assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed") }() // Eventually deletion is attempted a few times. - assert.Eventually(tCtx, func() bool { + ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) int { mutex.Lock() defer mutex.Unlock() - return podDeletions >= retries - }, 30*time.Second, time.Millisecond, "pod eviction failed") + return podDeletions + }).WithTimeout(30*time.Second).Should(gomega.BeNumerically(">=", retries), "pod eviction failed") // Now we can check the API calls. ktesting.Consistently(tCtx, func(tCtx ktesting.TContext) error { diff --git a/pkg/controller/tainteviction/timed_workers.go b/pkg/controller/tainteviction/timed_workers.go index 732b81bbc79..3bb2e48d0a7 100644 --- a/pkg/controller/tainteviction/timed_workers.go +++ b/pkg/controller/tainteviction/timed_workers.go @@ -57,6 +57,7 @@ type TimedWorker struct { } // createWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`. +// Returns nil if the work was started immediately and doesn't need a timer. func createWorker(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(ctx context.Context, fireAt time.Time, args *WorkArgs) error, clock clock.WithDelayedExecution) *TimedWorker { delay := fireAt.Sub(createdAt) logger := klog.FromContext(ctx) @@ -90,6 +91,7 @@ func (w *TimedWorker) Cancel() { type TimedWorkerQueue struct { sync.Mutex // map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker. + // Entries may be nil if the work didn't need a timer and is already running. workers map[string]*TimedWorker workFunc func(ctx context.Context, fireAt time.Time, args *WorkArgs) error clock clock.WithDelayedExecution @@ -145,6 +147,10 @@ func (q *TimedWorkerQueue) UpdateWork(ctx context.Context, args *WorkArgs, creat q.Lock() defer q.Unlock() if worker, exists := q.workers[key]; exists { + if worker == nil { + logger.V(4).Info("Keeping existing work, already in progress", "item", key) + return + } if worker.FireAt.Compare(fireAt) == 0 { logger.V(4).Info("Keeping existing work, same time", "item", key, "createTime", worker.CreatedAt, "firedTime", worker.FireAt) return