DRA device taints: DeviceTaintRule status

To update the right statuses, the controller must collect more information
about why a pod is being evicted. Updating the DeviceTaintRule statuses then is
handled by the same work queue as evicting pods.

Both operations already share the same client instance and thus QPS+server-side
throttling, so they might as well share the same work queue. Deleting pods is
not necessarily more important than informing users or vice-versa, so there is
no strong argument for having different queues.

While at it, switching the unit tests to usage of the same mock work queue as
in staging/src/k8s.io/dynamic-resource-allocation/internal/workqueue. Because
there is no time to add it properly to a staging repo, the implementation gets
copied.
This commit is contained in:
Patrick Ohly
2025-10-29 12:03:07 +01:00
parent 0689b628c7
commit bbf8bc766e
5 changed files with 1972 additions and 376 deletions

View File

@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"iter"
"maps"
"math"
"slices"
"strings"
@@ -38,6 +39,8 @@ import (
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
metav1ac "k8s.io/client-go/applyconfigurations/meta/v1"
resourceac "k8s.io/client-go/applyconfigurations/resource/v1alpha3"
coreinformers "k8s.io/client-go/informers/core/v1"
resourceinformers "k8s.io/client-go/informers/resource/v1"
resourcealphainformers "k8s.io/client-go/informers/resource/v1alpha3"
@@ -57,6 +60,13 @@ import (
utilpod "k8s.io/kubernetes/pkg/util/pod"
)
const (
// ruleStatusPeriod is the shortest time between DeviceTaintRule status
// updates while eviction is in progress. Once it is done, it no longer gets
// updated until in progress again.
ruleStatusPeriod = 10 * time.Second
)
// Controller listens to Taint changes of DRA devices and Toleration changes of ResourceClaims,
// then deletes Pods which use ResourceClaims that don't tolerate a NoExecute taint.
// Pods which have already reached a final state (aka terminated) don't need to be deleted.
@@ -93,13 +103,8 @@ type Controller struct {
metrics metrics.Metrics
workqueue workqueue.TypedRateLimitingInterface[workItem]
// evictPod ensures that the pod gets evicted at the specified time.
// It doesn't block.
evictPod func(pod tainteviction.NamespacedObject, fireAt time.Time)
// cancelEvict cancels eviction set up with evictPod earlier.
// Idempotent, returns false if there was nothing to cancel.
cancelEvict func(pod tainteviction.NamespacedObject) bool
evictPodHook func(pod tainteviction.NamespacedObject, eviction evictionAndReason)
cancelEvictHook func(pod tainteviction.NamespacedObject) bool
// mutex protects the following shared data structures.
mutex sync.Mutex
@@ -108,7 +113,7 @@ type Controller struct {
//
// The entry for pod gets deleted when eviction is no longer necessary
// and updated when the time changes.
deletePodAt map[tainteviction.NamespacedObject]time.Time
deletePodAt map[tainteviction.NamespacedObject]evictionAndReason
// maybeDeletePodCount counts how often a worker checked a pod.
// This is useful for unit testing, but probably not a good public metric.
@@ -119,6 +124,20 @@ type Controller struct {
// pools indexes all slices by driver and pool name.
pools map[poolID]pool
// taintRuleStats tracks information about work that was done for a specific DeviceTaintRule instance.
taintRuleStats map[types.UID]taintRuleStats
// simulateRule is set only during simulation of a None effect.
//
// During such a simulation the corresponding rule from ruleLister
// has EffectNone and this one here has EffectNoExecute.
simulateRule *resourcealpha.DeviceTaintRule
}
type taintRuleStats struct {
// numEvictedPods is the number of pods evicted because of this rule since starting the controller.
numEvictedPods int64
}
type poolID struct {
@@ -194,19 +213,19 @@ func (p pool) getTaintedDevices() []taintedDevice {
}
// getDevice looks up one device by name. Out-dated slices are ignored.
func (p pool) getDevice(deviceName string) *resourceapi.Device {
func (p pool) getDevice(deviceName string) (*resourceapi.ResourceSlice, *resourceapi.Device) {
for _, slice := range p.slices {
if slice.Spec.Pool.Generation != p.maxGeneration {
continue
}
for i := range slice.Spec.Devices {
if slice.Spec.Devices[i].Name == deviceName {
return &slice.Spec.Devices[i]
return slice, &slice.Spec.Devices[i]
}
}
}
return nil
return nil, nil
}
type taintedDevice struct {
@@ -219,23 +238,134 @@ type taintedDevice struct {
type allocatedClaim struct {
*resourceapi.ResourceClaim
// evictionTime, if non-nil, is the time at which pods using this claim need to be evicted.
// eviction, if non-nil, is the time at which pods using this claim need to be evicted.
// This is the smallest value of all such per-device values.
// For each device, the value is calculated as `<time of setting the taint> +
// <toleration seconds, 0 if not set>`.
evictionTime *metav1.Time
eviction *evictionAndReason
}
// evictionAndReason combines the time when eviction needs to start with all reasons
// why eviction needs to start.
type evictionAndReason struct {
when metav1.Time
reason evictionReason
}
func (et evictionAndReason) String() string {
return fmt.Sprintf("%s (%s)", et.when, et.reason)
}
func (et *evictionAndReason) equal(other *evictionAndReason) bool {
if (et == nil) != (other == nil) ||
et == nil {
return false
}
return et.when.Equal(&other.when) &&
slices.Equal(et.reason, other.reason)
}
// evictionReason collects all taints which caused eviction.
// It supports pretty-printing for logging and inclusion in
// user-facing descriptions
type evictionReason []trackedTaint
func (er evictionReason) String() string {
var parts []string
for _, taint := range er {
parts = append(parts, taint.String())
}
return strings.Join(parts, ", ")
}
// trackedTaint augments a DeviceTaint with a pointer to its origin.
// rule and slice are mutually exclusive. Exactly one of them is always set.
type trackedTaint struct {
rule *resourcealpha.DeviceTaintRule
slice sliceDeviceTaint
}
func (tt trackedTaint) deviceTaint() *resourceapi.DeviceTaint {
if tt.rule != nil {
// TODO when GA: directly point to rule.Spec.Taint.
return &resourceapi.DeviceTaint{
Key: tt.rule.Spec.Taint.Key,
Value: tt.rule.Spec.Taint.Value,
Effect: resourceapi.DeviceTaintEffect(tt.rule.Spec.Taint.Effect),
TimeAdded: tt.rule.Spec.Taint.TimeAdded,
}
}
if tt.slice.slice != nil {
index := slices.IndexFunc(tt.slice.slice.Spec.Devices, func(d resourceapi.Device) bool { return d.Name == tt.slice.deviceName })
return &tt.slice.slice.Spec.Devices[index].Taints[tt.slice.taintIndex]
}
// Huh?
return nil
}
func (tt trackedTaint) String() string {
if tt.rule != nil {
return fmt.Sprintf("DeviceTaintRule %s", newObject(tt.rule))
}
return fmt.Sprintf("ResourceSlice %s %s/%s/%s taint #%d", newObject(tt.slice.slice), tt.slice.slice.Spec.Driver, tt.slice.slice.Spec.Pool.Name, tt.slice.deviceName, tt.slice.taintIndex)
}
func (tt trackedTaint) Compare(b trackedTaint) int {
// Rules first...
if tt.rule != nil && b.rule == nil {
return -1
}
if tt.rule == nil && b.rule != nil {
return 1
}
if tt.rule != nil {
// By rule name.
return strings.Compare(tt.rule.Name, b.rule.Name)
}
if tt.slice.slice != nil && b.slice.slice != nil {
// Sort by driver/pool/device, then taint index.
if cmp := strings.Compare(tt.slice.slice.Spec.Driver, b.slice.slice.Spec.Driver); cmp != 0 {
return cmp
}
if cmp := strings.Compare(tt.slice.slice.Spec.Pool.Name, b.slice.slice.Spec.Pool.Name); cmp != 0 {
return cmp
}
if cmp := strings.Compare(tt.slice.deviceName, b.slice.deviceName); cmp != 0 {
return cmp
}
return tt.slice.taintIndex - b.slice.taintIndex
}
// Both empty? Either way, we cannot compare further.
return 0
}
// sliceDeviceTaint references one taint entry in a ResourceSlice device.
type sliceDeviceTaint struct {
slice *resourceapi.ResourceSlice
deviceName string
taintIndex int
}
// workItem is stored in a workqueue and describes some piece of work which
// needs to be done.
//
// Right now that work is deleting pods.
// Updating DeviceTaintRule status will be added later.
type workItem struct {
// podRef references a pod which may need to be deleted.
// podRef, if not empty, references a pod which may need to be deleted.
//
// Controller.deletePodAt is the source of truth for if and when the pod really needs to be removed.
podRef tainteviction.NamespacedObject
// ruleRef, if not empty, is a DeviceTaintRule whose status may have to be updated.
//
// The initial update is done as quickly as possible to give immediate feedback,
// then following updates are done at regular intervals (see ruleStatusPeriod).
ruleRef tainteviction.NamespacedObject
}
func workItemForRule(rule *resourcealpha.DeviceTaintRule) workItem {
return workItem{ruleRef: tainteviction.NamespacedObject{NamespacedName: types.NamespacedName{Name: rule.Name}, UID: rule.UID}}
}
// maybeDeletePod checks whether the pod needs to be deleted now and if so, does it.
@@ -245,22 +375,22 @@ type workItem struct {
// - a zero delay if the deletion is done or no longer necessary
func (tc *Controller) maybeDeletePod(ctx context.Context, podRef tainteviction.NamespacedObject) (againAfter time.Duration, finalErr error) {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "pod", podRef)
// We must not hold this mutex while doing blocking API calls.
// TODO: try an atomic map instead.
tc.mutex.Lock()
tc.maybeDeletePodCount++
fireAt, ok := tc.deletePodAt[podRef]
eviction, ok := tc.deletePodAt[podRef]
tc.mutex.Unlock()
logger.V(5).Info("Processing pod deletion work item", "pod", podRef, "active", ok, "fireAt", fireAt)
logger.V(5).Info("Processing pod deletion work item", "active", ok, "eviction", eviction)
if !ok {
logger.V(5).Info("Work item for pod deletion obsolete, nothing to do", "pod", podRef)
logger.V(5).Info("Work item for pod deletion obsolete, nothing to do")
return 0, nil
}
now := time.Now()
againAfter = fireAt.Sub(now)
againAfter = eviction.when.Sub(now)
if againAfter > 0 {
// Not yet. Maybe the fireAt time got updated.
return againAfter, nil
@@ -286,11 +416,25 @@ func (tc *Controller) maybeDeletePod(ctx context.Context, podRef tainteviction.N
return 0, err
}
podDeletionLatency := time.Since(fireAt)
// TODO: include more information why it was evicted.
klog.FromContext(ctx).Info("Evicted pod by deleting it", "pod", podRef, "latency", podDeletionLatency)
podDeletionLatency := time.Since(eviction.when.Time)
logger.V(2).Info("Evicted pod by deleting it", "latency", podDeletionLatency, "reason", eviction.reason)
tc.metrics.PodDeletionsTotal.Inc()
tc.metrics.PodDeletionsLatency.Observe(float64(podDeletionLatency.Seconds()))
tc.mutex.Lock()
defer tc.mutex.Unlock()
for _, reason := range eviction.reason {
if reason.rule != nil {
stats := tc.taintRuleStats[reason.rule.UID]
stats.numEvictedPods++
tc.taintRuleStats[reason.rule.UID] = stats
// Ensure that the status gets updated eventually.
// Doing this immediately is not useful because
// it would just race with the informers update
// (rule status reads from cache!).
tc.workqueue.AddAfter(workItemForRule(reason.rule), ruleStatusPeriod)
}
}
return 0, nil
}
@@ -339,6 +483,234 @@ func (tc *Controller) addConditionAndDeletePod(ctx context.Context, podRef taint
})
}
func (tc *Controller) maybeUpdateRuleStatus(ctx context.Context, ruleRef tainteviction.NamespacedObject) (time.Duration, error) {
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "deviceTaintRule", ruleRef)
logger.V(5).Info("Processing DeviceTaintRule status work item")
tc.mutex.Lock()
defer tc.mutex.Unlock()
rule, err := tc.ruleLister.Get(ruleRef.Name)
if apierrors.IsNotFound(err) {
logger.V(5).Info("DeviceTaintRule got deleted, removing from work queue")
return 0, nil
}
if err != nil {
return 0, fmt.Errorf("get DeviceTaintRule %s: %w", ruleRef.Name, err)
}
if rule.UID != ruleRef.UID {
logger.V(5).Info("DeviceTaintRule got replaced, removing old one from work queue")
return 0, nil
}
// Already set?
index := slices.IndexFunc(rule.Status.Conditions, func(condition metav1.Condition) bool {
return condition.Type == resourcealpha.DeviceTaintConditionEvictionInProgress
})
// LastTransitionTime gets bumped each time we make any change to the condition,
// even if it is only a change of the message. We use this to track when it is
// time for another update.
//
// This is intentionally checked before counting pending pods because that might
// be expensive. The effect is that "eviction in progress" gets set to false
// only with a certain delay instead of immediately after deleting the last
// pod.
var existingCondition metav1.Condition
now := metav1.Now()
if index >= 0 {
existingCondition = rule.Status.Conditions[index]
since := now.Time.Sub(existingCondition.LastTransitionTime.Time)
if existingCondition.ObservedGeneration == rule.Generation &&
since < ruleStatusPeriod {
// Don't update quite yet.
return ruleStatusPeriod - since, nil
}
}
// Checking all pods might be expensive. Only do it if really needed.
var numTaintedSliceDevices, numTaintedAllocatedDevices, numPendingPods, numPendingNamespaces int64
switch rule.Spec.Taint.Effect {
case resourcealpha.DeviceTaintEffectNone:
// Temporarily change the effect from None to NoExecute to simulate.
// We pretend to do that through informer events. We hold the lock,
// so there is no race with real informer events or other goroutines.
//
// To avoid having a lasting impact on the real controller instance
// we make a temporary copy.
ruleEvict := rule.DeepCopy()
ruleEvict.Spec.Taint.Effect = resourcealpha.DeviceTaintEffectNoExecute
tc := &Controller{
logger: klog.LoggerWithName(logger, "simulation"),
podLister: tc.podLister,
ruleLister: nil, // Replaced by simulateRule.
deletePodAt: make(map[tainteviction.NamespacedObject]evictionAndReason),
allocatedClaims: maps.Clone(tc.allocatedClaims),
pools: tc.pools,
simulateRule: ruleEvict,
// TODO: stub implementation
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[workItem](), workqueue.TypedRateLimitingQueueConfig[workItem]{}),
}
defer tc.workqueue.ShutDown()
tc.handleRuleChange(rule, ruleEvict)
numPendingPods, numPendingNamespaces, err = tc.countPendingPods(rule)
numTaintedSliceDevices, numTaintedAllocatedDevices = tc.countTaintedDevices(rule)
case resourcealpha.DeviceTaintEffectNoExecute:
numPendingPods, numPendingNamespaces, err = tc.countPendingPods(rule)
default:
err = nil
}
if err != nil {
return 0, fmt.Errorf("determine pending pods: %w", err)
}
// Some fields are tentative and get updated below.
newCondition := metav1.Condition{
Type: resourcealpha.DeviceTaintConditionEvictionInProgress,
Status: metav1.ConditionFalse,
Reason: string("Effect" + rule.Spec.Taint.Effect),
ObservedGeneration: rule.Generation,
LastTransitionTime: existingCondition.LastTransitionTime, // To avoid a false "is different" in the comparison, gets updated later.
}
switch rule.Spec.Taint.Effect {
case resourcealpha.DeviceTaintEffectNoExecute:
switch {
case numPendingPods > 0:
newCondition.Reason = "PodsPendingEviction"
newCondition.Status = metav1.ConditionTrue
if numPendingPods == 1 {
newCondition.Message = "1 pod needs to be evicted in "
} else {
newCondition.Message = fmt.Sprintf("%d pods need to be evicted in ", numPendingPods)
}
if numPendingNamespaces == 1 {
newCondition.Message += "1 namespace."
} else {
newCondition.Message += fmt.Sprintf("%d different namespaces.", numPendingNamespaces)
}
case tc.taintRuleStats[rule.UID].numEvictedPods > 0:
newCondition.Reason = "Completed"
default:
newCondition.Reason = "NotStarted"
}
case resourcealpha.DeviceTaintEffectNone:
newCondition.Reason = "NoEffect"
if numTaintedSliceDevices == 1 {
newCondition.Message += "1 published device selected. "
} else {
newCondition.Message += fmt.Sprintf("%d published devices selected. ", numTaintedSliceDevices)
}
if numTaintedAllocatedDevices == 1 {
newCondition.Message += "1 allocated device selected. "
} else {
newCondition.Message += fmt.Sprintf("%d allocated devices selected. ", numTaintedAllocatedDevices)
}
if numPendingPods == 1 {
newCondition.Message += "1 pod would be evicted in "
} else {
newCondition.Message += fmt.Sprintf("%d pods would be evicted in ", numPendingPods)
}
if numPendingNamespaces == 1 {
newCondition.Message += "1 namespace "
} else {
newCondition.Message += fmt.Sprintf("%d different namespaces ", numPendingNamespaces)
}
newCondition.Message += "if the effect was NoExecute. This information will not be updated again. Recreate the DeviceTaintRule to trigger an update."
default:
newCondition.Reason = "OtherEffect"
newCondition.Message = "Eviction only happens for the NoExecute effect."
}
if numEvictedPods := tc.taintRuleStats[rule.UID].numEvictedPods; numEvictedPods > 0 {
if newCondition.Message != "" {
newCondition.Message += " "
}
if numEvictedPods == 1 {
newCondition.Message += "1 pod "
} else {
newCondition.Message += fmt.Sprintf("%d pods ", numEvictedPods)
}
newCondition.Message += "evicted since starting the controller."
}
if newCondition != existingCondition {
newCondition.LastTransitionTime = now
logger.V(4).Info("Calculated new condition", "condition", newCondition)
// Apply the new condition, but only if the UID matches.
ruleAC := resourceac.DeviceTaintRule(rule.Name).WithUID(rule.UID).WithStatus(resourceac.DeviceTaintRuleStatus().WithConditions(&metav1ac.ConditionApplyConfiguration{
Type: &newCondition.Type,
Status: &newCondition.Status,
Reason: &newCondition.Reason,
Message: &newCondition.Message,
ObservedGeneration: &newCondition.ObservedGeneration,
LastTransitionTime: &newCondition.LastTransitionTime,
}))
if _, err := tc.client.ResourceV1alpha3().DeviceTaintRules().ApplyStatus(ctx, ruleAC, metav1.ApplyOptions{FieldManager: tc.name, Force: true}); err != nil {
return 0, fmt.Errorf("add condition to DeviceTaintRule status: %w", err)
}
}
// No further updates needed until some more pods get evicted or ready for eviction.
return 0, nil
}
func (tc *Controller) countPendingPods(rule *resourcealpha.DeviceTaintRule) (int64, int64, error) {
pods, err := tc.podLister.List(labels.Everything())
if err != nil {
return -1, -1, fmt.Errorf("list pod: %w", err)
}
namespaces := sets.New[string]()
var numPendingPods int64
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
continue
}
eviction := tc.podEvictionTime(pod)
if eviction == nil {
continue
}
for _, reason := range eviction.reason {
if reason.rule != nil &&
reason.rule.UID == rule.UID {
numPendingPods++
namespaces.Insert(pod.Namespace)
}
}
}
return numPendingPods, int64(namespaces.Len()), nil
}
// countTaintedDevices determines the number of devices in slices matching the rule and
// the number of allocated devices matching the rule.
func (tc *Controller) countTaintedDevices(rule *resourcealpha.DeviceTaintRule) (numTaintedSliceDevices int64, numTaintedAllocatedDevices int64) {
for poolID, pool := range tc.pools {
for _, slice := range pool.slices {
if slice.Spec.Pool.Generation != pool.maxGeneration {
continue
}
for _, device := range slice.Spec.Devices {
if ruleMatchesDevice(rule, poolID.driverName, poolID.poolName, device.Name) {
numTaintedSliceDevices++
}
}
}
}
for _, claim := range tc.allocatedClaims {
for _, allocatedDevice := range claim.Status.Allocation.Devices.Results {
if ruleMatchesDevice(rule, allocatedDevice.Driver, allocatedDevice.Pool, allocatedDevice.Device) {
numTaintedAllocatedDevices++
}
}
}
return
}
// New creates a new Controller that will use passed clientset to communicate with the API server.
// Spawns no goroutines. That happens in Run.
func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInformer resourceinformers.ResourceClaimInformer, sliceInformer resourceinformers.ResourceSliceInformer, taintInformer resourcealphainformers.DeviceTaintRuleInformer, classInformer resourceinformers.DeviceClassInformer, controllerName string) *Controller {
@@ -355,9 +727,10 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo
taintInformer: taintInformer,
classInformer: classInformer,
ruleLister: taintInformer.Lister(),
deletePodAt: make(map[tainteviction.NamespacedObject]time.Time),
deletePodAt: make(map[tainteviction.NamespacedObject]evictionAndReason),
allocatedClaims: make(map[types.NamespacedName]allocatedClaim),
pools: make(map[poolID]pool),
taintRuleStats: make(map[types.UID]taintRuleStats),
// Instantiate all informers now to ensure that they get started.
haveSynced: []cache.InformerSynced{
podInformer.Informer().HasSynced,
@@ -377,7 +750,7 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo
func (tc *Controller) Run(ctx context.Context, numWorkers int) error {
defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
logger.Info("Starting", "controller", tc.name)
logger.Info("Starting", "controller", tc.name, "numWorkers", numWorkers)
defer logger.Info("Shut down controller", "controller", tc.name, "reason", context.Cause(ctx))
tc.logger = logger
@@ -414,32 +787,6 @@ func (tc *Controller) Run(ctx context.Context, numWorkers int) error {
tc.workqueue.ShutDown()
}()
evictPod := tc.evictPod
tc.evictPod = func(podRef tainteviction.NamespacedObject, fireAt time.Time) {
tc.deletePodAt[podRef] = fireAt
now := time.Now()
tc.workqueue.AddAfter(workItem{podRef: podRef}, fireAt.Sub(now))
// Only relevant for testing.
if evictPod != nil {
evictPod(podRef, fireAt)
}
}
cancelEvict := tc.cancelEvict
tc.cancelEvict = func(podRef tainteviction.NamespacedObject) bool {
_, ok := tc.deletePodAt[podRef]
if !ok {
// Nothing to cancel.
return false
}
delete(tc.deletePodAt, podRef)
if cancelEvict != nil {
cancelEvict(podRef)
}
// Cannot remove from a work queue. The worker will detect that the entry is obsolete by checking deletePodAt.
return true
}
// Start events processing pipeline.
eventBroadcaster.StartStructuredLogging(3)
if tc.client != nil {
@@ -654,6 +1001,36 @@ func (tc *Controller) Run(ctx context.Context, numWorkers int) error {
return nil
}
// evictPod ensures that the pod gets evicted at the specified time.
// It doesn't block.
func (tc *Controller) evictPod(podRef tainteviction.NamespacedObject, eviction evictionAndReason) {
tc.deletePodAt[podRef] = eviction
now := time.Now()
tc.workqueue.AddAfter(workItem{podRef: podRef}, eviction.when.Sub(now))
if tc.evictPodHook != nil {
tc.evictPodHook(podRef, eviction)
}
}
// cancelEvict cancels eviction set up with evictPod earlier.
// Idempotent, returns false if there was nothing to cancel.
func (tc *Controller) cancelEvict(podRef tainteviction.NamespacedObject) bool {
_, ok := tc.deletePodAt[podRef]
if !ok {
// Nothing to cancel.
return false
}
delete(tc.deletePodAt, podRef)
if tc.cancelEvictHook != nil {
tc.cancelEvictHook(podRef)
}
// Cannot remove from a work queue. The worker will detect that the entry is obsolete by checking deletePodAt.
return true
}
// worker blocks until the workqueue is shut down.
// Cancellation of the context only aborts on-going work.
func (tc *Controller) worker(ctx context.Context) {
@@ -669,17 +1046,13 @@ func (tc *Controller) worker(ctx context.Context) {
func() {
defer tc.workqueue.Done(item)
var err error
var againAfter time.Duration
if item.podRef.Name != "" {
againAfter, err = tc.maybeDeletePod(ctx, item.podRef)
}
againAfter, err := tc.handleWork(ctx, item)
switch {
case err != nil:
logger.V(3).Info("Evicting pod failed, will retry", "err", err)
logger.V(3).Info("Processing work item failed, will retry", "err", err)
tc.workqueue.AddRateLimited(item)
case againAfter > 0:
logger.V(5).Info("Checking pod eviction again later", "delay", againAfter)
logger.V(5).Info("Checking work item again later", "delay", againAfter)
tc.workqueue.AddAfter(item, againAfter)
default:
tc.workqueue.Forget(item)
@@ -688,6 +1061,13 @@ func (tc *Controller) worker(ctx context.Context) {
}
}
func (tc *Controller) handleWork(ctx context.Context, item workItem) (time.Duration, error) {
if item.podRef.Name != "" {
return tc.maybeDeletePod(ctx, item.podRef)
}
return tc.maybeUpdateRuleStatus(ctx, item.ruleRef)
}
func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) {
claim := newClaim
if claim == nil {
@@ -713,7 +1093,7 @@ func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.Resource
}
tc.allocatedClaims[name] = allocatedClaim{
ResourceClaim: claim,
evictionTime: tc.evictionTime(claim),
eviction: tc.claimEvictionTime(claim),
}
tc.handlePods(claim)
return
@@ -742,7 +1122,7 @@ func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.Resource
if oldClaim.Status.Allocation == nil && newClaim.Status.Allocation != nil {
tc.allocatedClaims[name] = allocatedClaim{
ResourceClaim: claim,
evictionTime: tc.evictionTime(claim),
eviction: tc.claimEvictionTime(claim),
}
syncBothClaims()
return
@@ -761,7 +1141,7 @@ func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.Resource
// time. Storing the newer claim is enough.
tc.allocatedClaims[name] = allocatedClaim{
ResourceClaim: claim,
evictionTime: tc.allocatedClaims[name].evictionTime,
eviction: tc.allocatedClaims[name].eviction,
}
syncBothClaims()
return
@@ -770,24 +1150,25 @@ func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.Resource
// If we get here, nothing changed.
}
// evictionTime returns the earliest TimeAdded of any NoExecute taint in any allocated device
// claimEvictionTime returns the earliest TimeAdded of any NoExecute taint in any allocated device
// unless that taint is tolerated, nil if none. May only be called for allocated claims.
func (tc *Controller) evictionTime(claim *resourceapi.ResourceClaim) *metav1.Time {
var evictionTime *metav1.Time
func (tc *Controller) claimEvictionTime(claim *resourceapi.ResourceClaim) *evictionAndReason {
var when *metav1.Time
var taints sets.Set[trackedTaint]
allocation := claim.Status.Allocation
for _, allocatedDevice := range allocation.Devices.Results {
id := poolID{driverName: allocatedDevice.Driver, poolName: allocatedDevice.Pool}
device := tc.pools[id].getDevice(allocatedDevice.Device)
slice, device := tc.pools[id].getDevice(allocatedDevice.Device)
nextTaint:
for taint := range tc.allEvictingDeviceTaints(allocatedDevice, device) {
newEvictionTime := taint.TimeAdded
for taint := range tc.allEvictingDeviceTaints(allocatedDevice, slice, device) {
newEvictionTime := taint.deviceTaint().TimeAdded
haveToleration := false
tolerationSeconds := int64(math.MaxInt64)
for _, toleration := range allocatedDevice.Tolerations {
if toleration.Effect == resourceapi.DeviceTaintEffectNoExecute &&
resourceclaim.ToleratesTaint(toleration, *taint) {
resourceclaim.ToleratesTaint(toleration, *taint.deviceTaint()) {
if toleration.TolerationSeconds == nil {
// Tolerate forever -> ignore taint.
continue nextTaint
@@ -805,72 +1186,90 @@ func (tc *Controller) evictionTime(claim *resourceapi.ResourceClaim) *metav1.Tim
if haveToleration {
newEvictionTime = &metav1.Time{Time: newEvictionTime.Add(time.Duration(tolerationSeconds) * time.Second)}
}
if evictionTime == nil {
evictionTime = newEvictionTime
tc.logger.V(5).Info("Claim is affected by device taint", "claim", klog.KObj(claim), "device", allocatedDevice, "taint", taint, "evictionTime", evictionTime)
if taints == nil {
taints = sets.New[trackedTaint]()
}
taints.Insert(taint)
if when == nil {
when = newEvictionTime
tc.logger.V(5).Info("Claim is affected by device taint", "claim", klog.KObj(claim), "device", allocatedDevice, "taint", taint, "evictionTime", when)
continue
}
if newEvictionTime != nil && newEvictionTime.Before(evictionTime) {
evictionTime = newEvictionTime
tc.logger.V(5).Info("Claim is affected by device taint", "claim", klog.KObj(claim), "device", allocatedDevice, "taint", taint, "evictionTime", evictionTime)
if newEvictionTime != nil && newEvictionTime.Before(when) {
when = newEvictionTime
tc.logger.V(5).Info("Claim is affected by device taint", "claim", klog.KObj(claim), "device", allocatedDevice, "taint", taint, "evictionTime", when)
}
}
}
return evictionTime
if when == nil {
return nil
}
eviction := &evictionAndReason{when: *when, reason: taints.UnsortedList()}
slices.SortFunc(eviction.reason, func(a, b trackedTaint) int { return a.Compare(b) })
return eviction
}
// allEvictingDeviceTaints allows iterating over all DeviceTaintRules with NoExecute effect which affect the allocated device.
// A taint may come from either the ResourceSlice informer (not the tracker!) or from a DeviceTaintRule, but not both.
func (tc *Controller) allEvictingDeviceTaints(allocatedDevice resourceapi.DeviceRequestAllocationResult, device *resourceapi.Device) iter.Seq[*resourceapi.DeviceTaint] {
rules, err := tc.ruleLister.List(labels.Everything())
// TODO: instead of listing and handling an error, keep track of rules in the informer event handler?
if err != nil {
panic(err)
func (tc *Controller) allEvictingDeviceTaints(allocatedDevice resourceapi.DeviceRequestAllocationResult, slice *resourceapi.ResourceSlice, device *resourceapi.Device) iter.Seq[trackedTaint] {
var rules []*resourcealpha.DeviceTaintRule
var err error
if tc.ruleLister != nil {
rules, err = tc.ruleLister.List(labels.Everything())
if err != nil {
// TODO: instead of listing and handling an error, keep track of rules in the informer event handler?
utilruntime.HandleErrorWithLogger(tc.logger, err, "Local cache failed to list DeviceTaintRules")
return func(yield func(trackedTaint) bool) {}
}
}
return func(yield func(*resourceapi.DeviceTaint) bool) {
if tc.simulateRule != nil {
// Mix the rule for which we simulate EffectNoExecute into the set of
// rules which will be evaluated. Typically this is the only rule
// during simulation.
rules = append(rules, tc.simulateRule)
}
return func(yield func(trackedTaint) bool) {
if device != nil {
for i := range device.Taints {
taint := &device.Taints[i]
if taint.Effect != resourceapi.DeviceTaintEffectNoExecute {
continue
}
if !yield(taint) {
if !yield(trackedTaint{slice: sliceDeviceTaint{slice: slice, deviceName: device.Name, taintIndex: i}}) {
return
}
}
}
for _, rule := range rules {
if rule.Spec.Taint.Effect != resourcealpha.DeviceTaintEffectNoExecute {
if rule.Spec.Taint.Effect != resourcealpha.DeviceTaintEffectNoExecute ||
!ruleMatchesDevice(rule, allocatedDevice.Driver, allocatedDevice.Pool, allocatedDevice.Device) {
continue
}
selector := rule.Spec.DeviceSelector
if selector == nil {
continue
}
if selector.Driver != nil && *selector.Driver != allocatedDevice.Driver ||
selector.Pool != nil && *selector.Pool != allocatedDevice.Pool ||
selector.Device != nil && *selector.Device != allocatedDevice.Device {
continue
}
if !yield(
// TODO when GA: directly point to rule.Spec.Taint.
&resourceapi.DeviceTaint{
Key: rule.Spec.Taint.Key,
Value: rule.Spec.Taint.Value,
Effect: resourceapi.DeviceTaintEffect(rule.Spec.Taint.Effect),
TimeAdded: rule.Spec.Taint.TimeAdded,
},
) {
if !yield(trackedTaint{rule: rule}) {
return
}
}
}
}
func ruleMatchesDevice(rule *resourcealpha.DeviceTaintRule, driverName, poolName, deviceName string) bool {
selector := rule.Spec.DeviceSelector
if selector == nil {
return false
}
if selector.Driver != nil && *selector.Driver != driverName ||
selector.Pool != nil && *selector.Pool != poolName ||
selector.Device != nil && *selector.Device != deviceName {
return false
}
return true
}
func (tc *Controller) handleRuleChange(oldRule, newRule *resourcealpha.DeviceTaintRule) {
rule := newRule
if rule == nil {
@@ -882,6 +1281,18 @@ func (tc *Controller) handleRuleChange(oldRule, newRule *resourcealpha.DeviceTai
tc.eventLogger.Info("DeviceTaintRule changed", "ruleObject", name, "oldRule", klog.Format(oldRule), "newRule", klog.Format(newRule), "diff", diff.Diff(oldRule, newRule))
}
if oldRule == nil {
// Update the status at least once.
tc.workqueue.Add(workItemForRule(newRule))
}
if newRule == nil {
// Clean up to avoid memory leak.
delete(tc.taintRuleStats, oldRule.UID)
// Removal from the work queue is handled when a worker handles the work item.
// A work queue does not support canceling work.
}
if oldRule != nil &&
newRule != nil &&
oldRule.UID == newRule.UID &&
@@ -893,10 +1304,10 @@ func (tc *Controller) handleRuleChange(oldRule, newRule *resourcealpha.DeviceTai
for name, oldAllocatedClaim := range tc.allocatedClaims {
newAllocatedClaim := allocatedClaim{
ResourceClaim: oldAllocatedClaim.ResourceClaim,
eviction: tc.claimEvictionTime(oldAllocatedClaim.ResourceClaim),
}
newAllocatedClaim.evictionTime = tc.evictionTime(oldAllocatedClaim.ResourceClaim)
tc.allocatedClaims[name] = newAllocatedClaim
if !newAllocatedClaim.evictionTime.Equal(oldAllocatedClaim.evictionTime) {
if !newAllocatedClaim.eviction.equal(oldAllocatedClaim.eviction) {
tc.handlePods(newAllocatedClaim.ResourceClaim)
}
}
@@ -919,9 +1330,6 @@ func (tc *Controller) handleSliceChange(oldSlice, newSlice *resourceapi.Resource
// Determine old and new device taints. Only devices
// where something changes trigger additional checks for claims
// using them.
//
// The pre-allocated slices are small enough to be allocated on
// the stack (https://stackoverflow.com/a/69187698/222305).
p := tc.pools[poolID]
oldDeviceTaints := p.getTaintedDevices()
p.removeSlice(oldSlice)
@@ -978,12 +1386,12 @@ func (tc *Controller) handleSliceChange(oldSlice, newSlice *resourceapi.Resource
if !usesDevice(claim.Status.Allocation, poolID, modifiedDevices) {
continue
}
newEvictionTime := tc.evictionTime(claim.ResourceClaim)
if newEvictionTime.Equal(claim.evictionTime) {
newEvictionTime := tc.claimEvictionTime(claim.ResourceClaim)
if newEvictionTime.equal(claim.eviction) {
// No change.
continue
}
claim.evictionTime = newEvictionTime
claim.eviction = newEvictionTime
tc.allocatedClaims[name] = claim
// We could collect pods which depend on claims with changes.
// In practice, most pods probably depend on one claim, so
@@ -1038,7 +1446,7 @@ func (tc *Controller) handlePodChange(oldPod, newPod *v1.Pod) {
func (tc *Controller) handlePods(claim *resourceapi.ResourceClaim) {
for _, consumer := range claim.Status.ReservedFor {
if consumer.APIGroup == "" && consumer.Resource == "pods" {
pod, err := tc.podInformer.Lister().Pods(claim.Namespace).Get(consumer.Name)
pod, err := tc.podLister.Pods(claim.Namespace).Get(consumer.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return
@@ -1057,14 +1465,35 @@ func (tc *Controller) handlePods(claim *resourceapi.ResourceClaim) {
}
func (tc *Controller) handlePod(pod *v1.Pod) {
eviction := tc.podEvictionTime(pod)
podRef := newObject(pod)
if eviction == nil {
if tc.cancelWorkWithEvent(podRef) {
tc.logger.V(3).Info("Canceled pod eviction", "pod", podRef)
}
return
}
tc.logger.V(3).Info("Going to evict pod", "pod", podRef, "eviction", eviction)
tc.evictPod(podRef, *eviction)
// If any reason is because of a taint, then eviction is in progress and the status may need to be updated.
for _, reason := range eviction.reason {
if reason.rule != nil {
tc.workqueue.Add(workItemForRule(reason.rule))
}
}
}
func (tc *Controller) podEvictionTime(pod *v1.Pod) *evictionAndReason {
// Not scheduled yet? No need to evict.
if pod.Spec.NodeName == "" {
return
return nil
}
// If any claim in use by the pod is tainted such that the taint is not tolerated,
// the pod needs to be evicted.
var evictionTime *metav1.Time
var eviction *evictionAndReason
for i := range pod.Spec.ResourceClaims {
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
if err != nil {
@@ -1090,27 +1519,45 @@ func (tc *Controller) handlePod(pod *v1.Pod) {
// replacement under the same name. Either way, ignore.
continue
}
if allocatedClaim.evictionTime == nil {
if allocatedClaim.eviction == nil {
continue
}
if evictionTime == nil || allocatedClaim.evictionTime.Before(evictionTime) {
evictionTime = allocatedClaim.evictionTime
if eviction == nil {
// Use the new eviction time as-is.
eviction = allocatedClaim.eviction
} else {
// Join reasons and figure out the new time.
// Might not actually lead to any change.
newEvictionTime := &evictionAndReason{
when: allocatedClaim.eviction.when,
reason: slices.Clone(allocatedClaim.eviction.reason),
}
// Multiple reasons affecting the same pod should be so rare,
// a simple insertion sort is fine.
for _, reason := range eviction.reason {
index, found := slices.BinarySearchFunc(newEvictionTime.reason, reason, func(a, b trackedTaint) int { return a.Compare(b) })
if !found {
newEvictionTime.reason = slices.Insert(newEvictionTime.reason, index, reason)
}
}
if eviction.when.Before(&newEvictionTime.when) {
newEvictionTime.when = eviction.when
}
if !eviction.equal(newEvictionTime) {
eviction = newEvictionTime
}
}
tc.logger.V(3).Info("Going to evict pod", "evictionTime", *evictionTime, "claim", klog.KObj(allocatedClaim))
}
podRef := newObject(pod)
if evictionTime != nil {
tc.evictPod(podRef, evictionTime.Time)
} else {
tc.cancelWorkWithEvent(podRef)
}
return eviction
}
func (tc *Controller) cancelWorkWithEvent(podRef tainteviction.NamespacedObject) {
func (tc *Controller) cancelWorkWithEvent(podRef tainteviction.NamespacedObject) bool {
if tc.cancelEvict(podRef) {
tc.emitCancelPodDeletionEvent(podRef)
return true
}
return false
}
func (tc *Controller) emitPodDeletionEvent(podRef tainteviction.NamespacedObject) {
@@ -1148,7 +1595,11 @@ func newNamespacedName(obj metav1.Object) types.NamespacedName {
}
}
// TODO: replace with klog.ObjectInstance (https://github.com/kubernetes/klog/issues/422#issuecomment-3454948091).
func newObject(obj metav1.Object) tainteviction.NamespacedObject {
if obj == nil {
return tainteviction.NamespacedObject{}
}
return tainteviction.NamespacedObject{
NamespacedName: newNamespacedName(obj),
UID: obj.GetUID(),

View File

@@ -0,0 +1,257 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devicetainteviction
import (
"maps"
"slices"
"sync"
"time"
"k8s.io/client-go/util/workqueue"
)
// TODO (pohly): move this to k8s.io/client-go/util/workqueue/mockqueue.go
// if it turns out to be generally useful. Doc comments are already written
// as if the code was there.
// MockQueue is an implementation of [TypedRateLimitingInterface] which
// can be used to test a function which pulls work items out of a queue
// and processes them. It is thread-safe.
//
// A null instance is directly usable. The usual usage is:
//
// var m workqueue.Mock[string]
// m.SyncOne("some-item", func(queue workqueue.TypedRateLimitingInterface[string]) { ... } )
// if diff := cmp.Diff(workqueue.MockState[string]{}, m.State()); diff != "" {
// t.Errorf("unexpected state of mock work queue after sync (-want, +got):\n%s", diff)
// }
//
// All slices get reset to nil when they become empty, so there are no spurious
// differences because of nil vs. empty slice.
type Mock[T comparable] struct {
mutex sync.Mutex
state MockState[T]
}
type MockState[T comparable] struct {
// Ready contains the items which are ready for processing.
Ready []T
// InFlight contains the items which are currently being processed (= Get
// was called, Done not yet).
InFlight []T
// MismatchedDone contains the items for which Done was called without
// a matching Get.
MismatchedDone []T
// Later contains the items which are meant to be added to the queue after
// a certain delay (= AddAfter was called for them). They appear in the
// order in which AddAfter got called.
Later []MockDelayedItem[T]
// Failures contains the items and their retry count which failed to be
// processed (AddRateLimited called at least once, Forget not yet).
// The retry count is always larger than zero.
Failures map[T]int
// ShutDownCalled tracks how often ShutDown got called.
ShutDownCalled int
// ShutDownWithDrainCalled tracks how often ShutDownWithDrain got called.
ShutDownWithDrainCalled int
}
// DeepCopy takes a snapshot of all slices. It cannot do a deep copy of the items in those slices,
// but typically those keys are immutable.
func (m MockState[T]) DeepCopy() *MockState[T] {
m.Ready = slices.Clone(m.Ready)
m.InFlight = slices.Clone(m.InFlight)
m.MismatchedDone = slices.Clone(m.MismatchedDone)
m.Later = slices.Clone(m.Later)
m.Failures = maps.Clone(m.Failures)
return &m
}
// MockDelayedItem is an item which was queue for later processing.
type MockDelayedItem[T comparable] struct {
Item T
Duration time.Duration
}
// SyncOne adds the item to the work queue and calls sync.
// That sync function can pull one or more items from the work
// queue until the queue is empty. Then it is told that the queue
// is shutting down, which must cause it to return.
//
// The test can then retrieve the state of the queue to check the result.
func (m *Mock[T]) SyncOne(item T, sync func(workqueue.TypedRateLimitingInterface[T])) {
// sync must run with the mutex not locked.
defer sync(m)
m.mutex.Lock()
defer m.mutex.Unlock()
m.state.Ready = append(m.state.Ready, item)
}
// State returns the current state of the queue.
func (m *Mock[T]) State() MockState[T] {
m.mutex.Lock()
defer m.mutex.Unlock()
return *m.state.DeepCopy()
}
// Add implements [TypedInterface].
func (m *Mock[T]) Add(item T) {
m.mutex.Lock()
defer m.mutex.Unlock()
if !slices.Contains(m.state.Ready, item) {
m.state.Ready = append(m.state.Ready, item)
}
}
// Len implements [TypedInterface].
func (m *Mock[T]) Len() int {
m.mutex.Lock()
defer m.mutex.Unlock()
return len(m.state.Ready)
}
// Get implements [TypedInterface].
func (m *Mock[T]) Get() (item T, shutdown bool) {
m.mutex.Lock()
defer m.mutex.Unlock()
if len(m.state.Ready) == 0 {
shutdown = true
return
}
item = m.state.Ready[0]
m.state.Ready = m.state.Ready[1:]
if len(m.state.Ready) == 0 {
m.state.Ready = nil
}
m.state.InFlight = append(m.state.InFlight, item)
return item, false
}
// Done implements [TypedInterface].
func (m *Mock[T]) Done(item T) {
m.mutex.Lock()
defer m.mutex.Unlock()
index := slices.Index(m.state.InFlight, item)
if index < 0 {
m.state.MismatchedDone = append(m.state.MismatchedDone, item)
}
m.state.InFlight = slices.Delete(m.state.InFlight, index, index+1)
if len(m.state.InFlight) == 0 {
m.state.InFlight = nil
}
}
// ShutDown implements [TypedInterface].
func (m *Mock[T]) ShutDown() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.state.ShutDownCalled++
}
// ShutDownWithDrain implements [TypedInterface].
func (m *Mock[T]) ShutDownWithDrain() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.state.ShutDownWithDrainCalled++
}
// ShuttingDown implements [TypedInterface].
func (m *Mock[T]) ShuttingDown() bool {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.state.ShutDownCalled > 0 || m.state.ShutDownWithDrainCalled > 0
}
// AddAfter implements [TypedDelayingInterface.AddAfter]
func (m *Mock[T]) AddAfter(item T, duration time.Duration) {
if duration == 0 {
m.Add(item)
return
}
m.mutex.Lock()
defer m.mutex.Unlock()
for i := range m.state.Later {
if m.state.Later[i].Item == item {
// https://github.com/kubernetes/client-go/blob/270e5ab1714527c455865953da8ceba2810dbb50/util/workqueue/delaying_queue.go#L340-L349
// only shortens the delay for an existing item. It does not make it longer.
if m.state.Later[i].Duration > duration {
m.state.Later[i].Duration = duration
}
return
}
}
m.state.Later = append(m.state.Later, MockDelayedItem[T]{Item: item, Duration: duration})
}
// CancelAfter is an extension of the TypedDelayingInterface: it allows a test to remove an item that may or may not have been added before via AddAfter.
func (m *Mock[T]) CancelAfter(item T) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.state.Later = slices.DeleteFunc(m.state.Later, func(later MockDelayedItem[T]) bool {
return later.Item == item
})
}
// AddRateLimited implements [TypedRateLimitingInterface.AddRateLimited].
func (m *Mock[T]) AddRateLimited(item T) {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.state.Failures == nil {
m.state.Failures = make(map[T]int)
}
m.state.Failures[item]++
}
// Forget implements [TypedRateLimitingInterface.Forget].
func (m *Mock[T]) Forget(item T) {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.state.Failures == nil {
return
}
delete(m.state.Failures, item)
}
// NumRequeues implements [TypedRateLimitingInterface.NumRequeues].
func (m *Mock[T]) NumRequeues(item T) int {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.state.Failures[item]
}

View File

@@ -224,6 +224,8 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
rbacv1helpers.NewRule("get", "list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
// Sets pod conditions.
rbacv1helpers.NewRule("update", "patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie(),
// Sets DeviceTaintRule conditions.
rbacv1helpers.NewRule("update", "patch").Groups(resourceGroup).Resources("devicetaintrules/status").RuleOrDie(),
// The rest is read-only.
rbacv1helpers.NewRule("get", "list", "watch").Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(),
rbacv1helpers.NewRule("get", "list", "watch").Groups(resourceGroup).Resources("resourceslices").RuleOrDie(),

View File

@@ -216,6 +216,16 @@ func (m *Mock[T]) AddAfter(item T, duration time.Duration) {
m.state.Later = append(m.state.Later, MockDelayedItem[T]{Item: item, Duration: duration})
}
// CancelAfter is an extension of the TypedDelayingInterface: it allows a test to remove an item that may or may not have been added before via AddAfter.
func (m *Mock[T]) CancelAfter(item T) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.state.Later = slices.DeleteFunc(m.state.Later, func(later MockDelayedItem[T]) bool {
return later.Item == item
})
}
// AddRateLimited implements [TypedRateLimitingInterface.AddRateLimited].
func (m *Mock[T]) AddRateLimited(item T) {
m.mutex.Lock()