Merge pull request #130947 from pohly/dra-device-taints-flake

DRA device taints: fix some race conditions
This commit is contained in:
Kubernetes Prow Robot 2025-03-20 14:16:55 -07:00 committed by GitHub
commit b0d6079ddc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 71 additions and 24 deletions

View File

@ -254,7 +254,11 @@ func startDeviceTaintEvictionController(ctx context.Context, controllerContext C
controllerContext.InformerFactory.Resource().V1beta1().DeviceClasses(),
controllerName,
)
go deviceTaintEvictionController.Run(ctx)
go func() {
if err := deviceTaintEvictionController.Run(ctx); err != nil {
klog.FromContext(ctx).Error(err, "Device taint processing leading to Pod eviction failed and is now paused")
}
}()
return nil, true, nil
}

View File

@ -18,6 +18,7 @@ package devicetainteviction
import (
"context"
"errors"
"fmt"
"math"
"slices"
@ -319,7 +320,8 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo
}
// Run starts the controller which will run until the context is done.
func (tc *Controller) Run(ctx context.Context) {
// An error is returned for startup problems.
func (tc *Controller) Run(ctx context.Context) error {
defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
logger.Info("Starting", "controller", tc.name)
@ -370,7 +372,7 @@ func (tc *Controller) Run(ctx context.Context) {
// mutex serializes event processing.
var mutex sync.Mutex
claimHandler, _ := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
claimHandler, err := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
claim, ok := obj.(*resourceapi.ResourceClaim)
if !ok {
@ -409,12 +411,15 @@ func (tc *Controller) Run(ctx context.Context) {
tc.handleClaimChange(claim, nil)
},
})
if err != nil {
return fmt.Errorf("adding claim event handler:%w", err)
}
defer func() {
_ = tc.claimInformer.Informer().RemoveEventHandler(claimHandler)
}()
tc.haveSynced = append(tc.haveSynced, claimHandler.HasSynced)
podHandler, _ := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
podHandler, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
pod, ok := obj.(*v1.Pod)
if !ok {
@ -453,6 +458,9 @@ func (tc *Controller) Run(ctx context.Context) {
tc.handlePodChange(pod, nil)
},
})
if err != nil {
return fmt.Errorf("adding pod event handler: %w", err)
}
defer func() {
_ = tc.podInformer.Informer().RemoveEventHandler(podHandler)
}()
@ -467,8 +475,7 @@ func (tc *Controller) Run(ctx context.Context) {
}
sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts)
if err != nil {
logger.Info("Failed to initialize ResourceSlice tracker; device taint processing leading to Pod eviction is now paused", "err", err)
return
return fmt.Errorf("initialize ResourceSlice tracker: %w", err)
}
tc.haveSynced = append(tc.haveSynced, sliceTracker.HasSynced)
defer sliceTracker.Stop()
@ -478,11 +485,11 @@ func (tc *Controller) Run(ctx context.Context) {
// work which might be done as events get emitted for intermediate
// state.
if !cache.WaitForNamedCacheSyncWithContext(ctx, tc.haveSynced...) {
return
return errors.New("wait for cache sync timed out")
}
logger.V(1).Info("Underlying informers have synced")
_, _ = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
@ -519,12 +526,16 @@ func (tc *Controller) Run(ctx context.Context) {
tc.handleSliceChange(slice, nil)
},
})
if err != nil {
return fmt.Errorf("add slice event handler: %w", err)
}
// sliceTracker.AddEventHandler blocked while delivering events for all known
// ResourceSlices. Therefore our own state is up-to-date once we get here.
tc.hasSynced.Store(1)
<-ctx.Done()
return nil
}
func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) {
@ -783,12 +794,13 @@ func (tc *Controller) handlePodChange(oldPod, newPod *v1.Pod) {
// Pods get updated quite frequently. There's no need
// to check them again unless something changed regarding
// their claims.
// their claims or they got scheduled.
//
// In particular this prevents adding the pod again
// directly after the eviction condition got added
// to it.
if oldPod != nil &&
oldPod.Spec.NodeName == newPod.Spec.NodeName &&
apiequality.Semantic.DeepEqual(oldPod.Status.ResourceClaimStatuses, newPod.Status.ResourceClaimStatuses) {
return
}

View File

@ -318,6 +318,10 @@ var (
OwnerReference(podName, podUID+"-other", podKind).
UID("other").
Obj()
unscheduledPodWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}).
Obj()
podWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}).
@ -494,6 +498,23 @@ func TestHandlers(t *testing.T) {
// At the moment, the code reliably cancels right away.
wantEvents: []*v1.Event{cancelPodEviction},
},
"evict-pod-after-scheduling": {
initialState: state{
pods: []*v1.Pod{unscheduledPodWithClaimName},
slices: []*resourceapi.ResourceSlice{sliceTainted, slice2},
allocatedClaims: []allocatedClaim{{ResourceClaim: inUseClaim, evictionTime: &taintTime}},
},
events: []any{
// Normally the scheduler shouldn't schedule when there is a taint,
// but perhaps it didn't know yet.
update(unscheduledPodWithClaimName, podWithClaimName),
},
finalState: state{
slices: []*resourceapi.ResourceSlice{sliceTainted, slice2},
allocatedClaims: []allocatedClaim{{ResourceClaim: inUseClaim, evictionTime: &taintTime}},
evicting: []evictAt{{newObject(podWithClaimName), taintTime.Time}},
},
},
"evict-pod-resourceclaim-unrelated-changes": {
initialState: state{
pods: []*v1.Pod{podWithClaimName},
@ -1339,22 +1360,22 @@ func TestEviction(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
controller.Run(tCtx)
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}()
// Eventually the controller should have synced it's informers.
require.Eventually(tCtx, func() bool {
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool {
return controller.hasSynced.Load() > 0
}, 30*time.Second, time.Millisecond, "controller synced")
}).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("controller synced"))
if tt.afterSync != nil {
tt.afterSync(tCtx)
}
// Eventually the pod gets deleted (= evicted).
assert.Eventually(tCtx, func() bool {
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool {
_, err := fakeClientset.CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{})
return apierrors.IsNotFound(err)
}, 30*time.Second, time.Millisecond, "pod evicted")
}).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("pod evicted"))
pod := pod.DeepCopy()
pod.Status.Conditions = []v1.PodCondition{{
@ -1369,7 +1390,10 @@ func TestEviction(t *testing.T) {
// Shortly after deletion we should also see updated metrics.
// This is the last thing the controller does for a pod.
// However, actually creating the event on the server is asynchronous,
// so we also have to wait for that.
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error {
gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent())
return testPodDeletionsMetrics(controller, 1)
}).WithTimeout(30*time.Second).Should(gomega.Succeed(), "pod eviction done")
@ -1450,7 +1474,7 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) {
wg.Add(1)
go func() {
defer wg.Done()
controller.Run(tCtx)
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}()
// Eventually the pod gets scheduled for eviction.
@ -1543,15 +1567,15 @@ func TestParallelPodDeletion(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
controller.Run(tCtx)
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}()
// Eventually the pod gets deleted, in this test by us.
assert.Eventually(tCtx, func() bool {
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool {
mutex.Lock()
defer mutex.Unlock()
return podGets >= 1
}, 30*time.Second, time.Millisecond, "pod eviction started")
}).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("pod eviction started"))
// We don't want any events.
ktesting.Consistently(tCtx, func(tCtx ktesting.TContext) error {
@ -1622,11 +1646,12 @@ func TestRetry(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
controller.Run(tCtx)
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}()
// Eventually the pod gets deleted.
// Eventually the pod gets deleted and the event is recorded.
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error {
gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent())
return testPodDeletionsMetrics(controller, 1)
}).WithTimeout(30*time.Second).Should(gomega.Succeed(), "pod eviction done")
@ -1694,15 +1719,15 @@ func TestEvictionFailure(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
controller.Run(tCtx)
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
}()
// Eventually deletion is attempted a few times.
assert.Eventually(tCtx, func() bool {
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) int {
mutex.Lock()
defer mutex.Unlock()
return podDeletions >= retries
}, 30*time.Second, time.Millisecond, "pod eviction failed")
return podDeletions
}).WithTimeout(30*time.Second).Should(gomega.BeNumerically(">=", retries), "pod eviction failed")
// Now we can check the API calls.
ktesting.Consistently(tCtx, func(tCtx ktesting.TContext) error {

View File

@ -57,6 +57,7 @@ type TimedWorker struct {
}
// createWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
// Returns nil if the work was started immediately and doesn't need a timer.
func createWorker(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(ctx context.Context, fireAt time.Time, args *WorkArgs) error, clock clock.WithDelayedExecution) *TimedWorker {
delay := fireAt.Sub(createdAt)
logger := klog.FromContext(ctx)
@ -90,6 +91,7 @@ func (w *TimedWorker) Cancel() {
type TimedWorkerQueue struct {
sync.Mutex
// map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker.
// Entries may be nil if the work didn't need a timer and is already running.
workers map[string]*TimedWorker
workFunc func(ctx context.Context, fireAt time.Time, args *WorkArgs) error
clock clock.WithDelayedExecution
@ -145,6 +147,10 @@ func (q *TimedWorkerQueue) UpdateWork(ctx context.Context, args *WorkArgs, creat
q.Lock()
defer q.Unlock()
if worker, exists := q.workers[key]; exists {
if worker == nil {
logger.V(4).Info("Keeping existing work, already in progress", "item", key)
return
}
if worker.FireAt.Compare(fireAt) == 0 {
logger.V(4).Info("Keeping existing work, same time", "item", key, "createTime", worker.CreatedAt, "firedTime", worker.FireAt)
return