DRA taint eviction: improve error handling

There was one error path that led to a "controller has shut down" log
message. Other errors caused different log entries or are so unlikely (event
handler registration failure!) that they weren't checked at all.

It's clearer to let Run return an error in all cases and then log the
"controller has shut down" error at the call site. This also enables tests to
mark themselves as failed, should that ever happen.
This commit is contained in:
Patrick Ohly 2025-03-20 17:44:38 +01:00
parent 473533adaa
commit ac6e47cb14
3 changed files with 28 additions and 13 deletions

View File

@ -254,7 +254,11 @@ func startDeviceTaintEvictionController(ctx context.Context, controllerContext C
controllerContext.InformerFactory.Resource().V1beta1().DeviceClasses(), controllerContext.InformerFactory.Resource().V1beta1().DeviceClasses(),
controllerName, controllerName,
) )
go deviceTaintEvictionController.Run(ctx) go func() {
if err := deviceTaintEvictionController.Run(ctx); err != nil {
klog.FromContext(ctx).Error(err, "Device taint processing leading to Pod eviction failed and is now paused")
}
}()
return nil, true, nil return nil, true, nil
} }

View File

@ -18,6 +18,7 @@ package devicetainteviction
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math" "math"
"slices" "slices"
@ -319,7 +320,8 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo
} }
// Run starts the controller which will run until the context is done. // Run starts the controller which will run until the context is done.
func (tc *Controller) Run(ctx context.Context) { // An error is returned for startup problems.
func (tc *Controller) Run(ctx context.Context) error {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
logger.Info("Starting", "controller", tc.name) logger.Info("Starting", "controller", tc.name)
@ -370,7 +372,7 @@ func (tc *Controller) Run(ctx context.Context) {
// mutex serializes event processing. // mutex serializes event processing.
var mutex sync.Mutex var mutex sync.Mutex
claimHandler, _ := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ claimHandler, err := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { AddFunc: func(obj any) {
claim, ok := obj.(*resourceapi.ResourceClaim) claim, ok := obj.(*resourceapi.ResourceClaim)
if !ok { if !ok {
@ -409,12 +411,15 @@ func (tc *Controller) Run(ctx context.Context) {
tc.handleClaimChange(claim, nil) tc.handleClaimChange(claim, nil)
}, },
}) })
if err != nil {
return fmt.Errorf("adding claim event handler:%w", err)
}
defer func() { defer func() {
_ = tc.claimInformer.Informer().RemoveEventHandler(claimHandler) _ = tc.claimInformer.Informer().RemoveEventHandler(claimHandler)
}() }()
tc.haveSynced = append(tc.haveSynced, claimHandler.HasSynced) tc.haveSynced = append(tc.haveSynced, claimHandler.HasSynced)
podHandler, _ := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ podHandler, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { AddFunc: func(obj any) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
if !ok { if !ok {
@ -453,6 +458,9 @@ func (tc *Controller) Run(ctx context.Context) {
tc.handlePodChange(pod, nil) tc.handlePodChange(pod, nil)
}, },
}) })
if err != nil {
return fmt.Errorf("adding pod event handler: %w", err)
}
defer func() { defer func() {
_ = tc.podInformer.Informer().RemoveEventHandler(podHandler) _ = tc.podInformer.Informer().RemoveEventHandler(podHandler)
}() }()
@ -467,8 +475,7 @@ func (tc *Controller) Run(ctx context.Context) {
} }
sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts) sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts)
if err != nil { if err != nil {
logger.Info("Failed to initialize ResourceSlice tracker; device taint processing leading to Pod eviction is now paused", "err", err) return fmt.Errorf("initialize ResourceSlice tracker: %w", err)
return
} }
tc.haveSynced = append(tc.haveSynced, sliceTracker.HasSynced) tc.haveSynced = append(tc.haveSynced, sliceTracker.HasSynced)
defer sliceTracker.Stop() defer sliceTracker.Stop()
@ -478,11 +485,11 @@ func (tc *Controller) Run(ctx context.Context) {
// work which might be done as events get emitted for intermediate // work which might be done as events get emitted for intermediate
// state. // state.
if !cache.WaitForNamedCacheSyncWithContext(ctx, tc.haveSynced...) { if !cache.WaitForNamedCacheSyncWithContext(ctx, tc.haveSynced...) {
return return errors.New("wait for cache sync timed out")
} }
logger.V(1).Info("Underlying informers have synced") logger.V(1).Info("Underlying informers have synced")
_, _ = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{ _, err = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { AddFunc: func(obj any) {
slice, ok := obj.(*resourceapi.ResourceSlice) slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok { if !ok {
@ -519,12 +526,16 @@ func (tc *Controller) Run(ctx context.Context) {
tc.handleSliceChange(slice, nil) tc.handleSliceChange(slice, nil)
}, },
}) })
if err != nil {
return fmt.Errorf("add slice event handler: %w", err)
}
// sliceTracker.AddEventHandler blocked while delivering events for all known // sliceTracker.AddEventHandler blocked while delivering events for all known
// ResourceSlices. Therefore our own state is up-to-date once we get here. // ResourceSlices. Therefore our own state is up-to-date once we get here.
tc.hasSynced.Store(1) tc.hasSynced.Store(1)
<-ctx.Done() <-ctx.Done()
return nil
} }
func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) { func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) {

View File

@ -1339,7 +1339,7 @@ func TestEviction(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
controller.Run(tCtx) assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}() }()
// Eventually the controller should have synced it's informers. // Eventually the controller should have synced it's informers.
@ -1450,7 +1450,7 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
controller.Run(tCtx) assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}() }()
// Eventually the pod gets scheduled for eviction. // Eventually the pod gets scheduled for eviction.
@ -1543,7 +1543,7 @@ func TestParallelPodDeletion(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
controller.Run(tCtx) assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}() }()
// Eventually the pod gets deleted, in this test by us. // Eventually the pod gets deleted, in this test by us.
@ -1622,7 +1622,7 @@ func TestRetry(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
controller.Run(tCtx) assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}() }()
// Eventually the pod gets deleted. // Eventually the pod gets deleted.
@ -1694,7 +1694,7 @@ func TestEvictionFailure(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
controller.Run(tCtx) assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}() }()
// Eventually deletion is attempted a few times. // Eventually deletion is attempted a few times.