diff --git a/staging/src/k8s.io/dynamic-resource-allocation/internal/queue/fifo.go b/staging/src/k8s.io/dynamic-resource-allocation/internal/queue/fifo.go new file mode 100644 index 00000000000..23d4157d6fd --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/internal/queue/fifo.go @@ -0,0 +1,112 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// TODO: This is duplicated from ./pkg/scheduler/util/queue because that +// package is not allowed to be imported here. +package queue + +const ( + // normalSize limits the size of the buffer that is kept + // for reuse. + normalSize = 4 +) + +// FIFO implements a first-in-first-out queue with unbounded size. +// The null FIFO is a valid empty queue. +// +// Access must be protected by the caller when used concurrently by +// different goroutines, the queue itself implements no locking. +type FIFO[T any] struct { + // elements contains a buffer for elements which have been + // pushed and not popped yet. Two scenarios are possible: + // - one chunk in the middle (start <= end) + // - one chunk at the end, followed by one chunk at the + // beginning (end <= start) + // + // start == end can be either an empty queue or a completely + // full one (with two chunks). + elements []T + + // len counts the number of elements which have been pushed and + // not popped yet. + len int + + // start is the index of the first valid element. + start int + + // end is the index after the last valid element. + end int +} + +func (q *FIFO[T]) Len() int { + return q.len +} + +func (q *FIFO[T]) Push(element T) { + size := len(q.elements) + if q.len == size { + // Need larger buffer. + newSize := size * 2 + if newSize == 0 { + newSize = normalSize + } + elements := make([]T, newSize) + if q.start == 0 { + copy(elements, q.elements) + } else { + copy(elements, q.elements[q.start:]) + copy(elements[len(q.elements)-q.start:], q.elements[0:q.end]) + } + q.start = 0 + q.end = q.len + q.elements = elements + size = newSize + } + if q.end == size { + // Wrap around. + q.elements[0] = element + q.end = 1 + q.len++ + return + } + q.elements[q.end] = element + q.end++ + q.len++ +} + +func (q *FIFO[T]) Pop() (element T, ok bool) { + if q.len == 0 { + return + } + element = q.elements[q.start] + q.start++ + if q.start == len(q.elements) { + // Wrap around. + q.start = 0 + } + q.len-- + + // Once it is empty, shrink down to avoid hanging onto + // a large buffer forever. + if q.len == 0 && len(q.elements) > normalSize { + q.elements = make([]T, normalSize) + q.start = 0 + q.end = 0 + } + + ok = true + return +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/internal/queue/fifo_test.go b/staging/src/k8s.io/dynamic-resource-allocation/internal/queue/fifo_test.go new file mode 100644 index 00000000000..fd3140ed43c --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/internal/queue/fifo_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func verifyPop(t *testing.T, expectedValue int, expectedOk bool, queue *FIFO[int]) { + t.Helper() + actual, ok := queue.Pop() + require.Equal(t, expectedOk, ok) + require.Equal(t, expectedValue, actual) +} + +func verifyEmpty(t *testing.T, queue *FIFO[int]) { + t.Helper() + require.Equal(t, 0, queue.Len()) + verifyPop(t, 0, false, queue) +} + +func TestNull(t *testing.T) { + var queue FIFO[int] + verifyEmpty(t, &queue) +} + +func TestOnePushPop(t *testing.T) { + var queue FIFO[int] + + expected := 10 + queue.Push(10) + require.Equal(t, 1, queue.Len()) + verifyPop(t, expected, true, &queue) + verifyEmpty(t, &queue) +} + +// Pushes some elements, pops all of them, then the same again. +func TestWrapAroundEmpty(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < 5; i++ { + queue.Push(i) + } + require.Equal(t, 5, queue.Len()) + for i := 0; i < 5; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) + + for i := 5; i < 10; i++ { + queue.Push(i) + } + for i := 5; i < 10; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) +} + +// Pushes some elements, pops one, adds more, then pops all. +func TestWrapAroundPartial(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < 5; i++ { + queue.Push(i) + } + require.Equal(t, 5, queue.Len()) + verifyPop(t, 0, true, &queue) + + for i := 5; i < 10; i++ { + queue.Push(i) + } + for i := 1; i < 10; i++ { + verifyPop(t, i, true, &queue) + } + verifyEmpty(t, &queue) +} + +// Push an unusual amount of elements, pop all, and verify that +// the FIFO shrinks back again. +func TestShrink(t *testing.T) { + var queue FIFO[int] + + for i := 0; i < normalSize*2; i++ { + queue.Push(i) + } + require.Equal(t, normalSize*2, queue.Len()) + require.LessOrEqual(t, 2*normalSize, len(queue.elements)) + + // Pop all, should be shrunken when done. + for i := 0; i < normalSize*2; i++ { + verifyPop(t, i, true, &queue) + } + require.Equal(t, 0, queue.Len()) + require.Len(t, queue.elements, normalSize) + + // Still usable after shrinking? + queue.Push(42) + verifyPop(t, 42, true, &queue) + require.Equal(t, 0, queue.Len()) + require.Len(t, queue.elements, normalSize) +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go new file mode 100644 index 00000000000..2f198b13099 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go @@ -0,0 +1,798 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tracker + +import ( + "context" + "errors" + "fmt" + "slices" + "sync" + + "github.com/google/go-cmp/cmp" //nolint:depguard + + v1 "k8s.io/api/core/v1" + resourcealphaapi "k8s.io/api/resource/v1alpha3" + resourceapi "k8s.io/api/resource/v1beta1" + labels "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + resourcealphainformers "k8s.io/client-go/informers/resource/v1alpha3" + resourceinformers "k8s.io/client-go/informers/resource/v1beta1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + resourcelisters "k8s.io/client-go/listers/resource/v1beta1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/dynamic-resource-allocation/cel" + "k8s.io/dynamic-resource-allocation/internal/queue" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" +) + +const ( + driverPoolDeviceIndexName = "driverPoolDevice" + + anyDriver = "*" + anyPool = "*" + anyDevice = "*" +) + +// Tracker maintains a view of ResourceSlice objects with matching +// DeviceTaintRules applied. It is backed by informers to process +// potential changes to resolved ResourceSlices asynchronously. +type Tracker struct { + enableDeviceTaints bool + + resourceSliceLister resourcelisters.ResourceSliceLister + resourceSlices cache.SharedIndexInformer + resourceSlicesHandle cache.ResourceEventHandlerRegistration + deviceTaints cache.SharedIndexInformer + deviceTaintsHandle cache.ResourceEventHandlerRegistration + deviceClasses cache.SharedIndexInformer + deviceClassesHandle cache.ResourceEventHandlerRegistration + celCache *cel.Cache + patchedResourceSlices cache.Store + broadcaster record.EventBroadcaster + recorder record.EventRecorder + // handleError usually refers to [utilruntime.HandleErrorWithContext] but + // may be overridden in tests. + handleError func(context.Context, error, string, ...any) + + // Synchronizes updates to these fields related to event handlers. + rwMutex sync.RWMutex + // All registered event handlers. + eventHandlers []cache.ResourceEventHandler + // The eventQueue contains functions which deliver an event to one + // event handler. + // + // These functions must be invoked while *not locking* rwMutex because + // the event handlers are allowed to access the cache. Holding rwMutex + // then would cause a deadlock. + // + // New functions get added as part of processing a cache update while + // the rwMutex is locked. Each function which adds something to the queue + // also drains the queue before returning, therefore it is guaranteed + // that all event handlers get notified immediately (useful for unit + // testing). + // + // A channel cannot be used here because it cannot have an unbounded + // capacity. This could lead to a deadlock (writer holds rwMutex, + // gets blocked because capacity is exhausted, reader is in a handler + // which tries to lock the rwMutex). Writing into such a channel + // while not holding the rwMutex doesn't work because in-order delivery + // of events would no longer be guaranteed. + eventQueue queue.FIFO[func()] +} + +// Options configure a [Tracker]. +type Options struct { + // EnableDeviceTaints controls whether DeviceTaintRules + // will be reflected in ResourceSlices reported by the tracker. + // + // If false, then TaintInformer and ClassInformer + // are not needed. The tracker turns into + // a thin wrapper around the underlying + // SliceInformer, with no processing of its own. + EnableDeviceTaints bool + + SliceInformer resourceinformers.ResourceSliceInformer + TaintInformer resourcealphainformers.DeviceTaintRuleInformer + ClassInformer resourceinformers.DeviceClassInformer + + // KubeClient is used to generate Events when CEL expressions + // encounter runtime errors. + KubeClient kubernetes.Interface +} + +// StartTracker creates and initializes informers for a new [Tracker]. +func StartTracker(ctx context.Context, opts Options) (finalT *Tracker, finalErr error) { + if !opts.EnableDeviceTaints { + // Minimal wrapper. All public methods shortcut by calling the underlying informer. + return &Tracker{ + resourceSliceLister: opts.SliceInformer.Lister(), + resourceSlices: opts.SliceInformer.Informer(), + }, nil + } + + t, err := newTracker(ctx, opts) + if err != nil { + return nil, err + } + defer func() { + // If we don't return the tracker, stop the partially initialized instance. + if finalErr != nil { + t.Stop() + } + }() + if err := t.initInformers(ctx); err != nil { + return nil, fmt.Errorf("initialize informers: %w", err) + } + return t, nil +} + +// newTracker is used in testing to construct a tracker without informer event handlers. +func newTracker(ctx context.Context, opts Options) (finalT *Tracker, finalErr error) { + t := &Tracker{ + enableDeviceTaints: opts.EnableDeviceTaints, + resourceSliceLister: opts.SliceInformer.Lister(), + resourceSlices: opts.SliceInformer.Informer(), + deviceTaints: opts.TaintInformer.Informer(), + deviceClasses: opts.ClassInformer.Informer(), + celCache: cel.NewCache(10), + patchedResourceSlices: cache.NewStore(cache.MetaNamespaceKeyFunc), + handleError: utilruntime.HandleErrorWithContext, + } + defer func() { + // If we don't return the tracker, stop the partially initialized instance. + if finalErr != nil { + t.Stop() + } + }() + err := t.resourceSlices.AddIndexers(cache.Indexers{driverPoolDeviceIndexName: sliceDriverPoolDeviceIndexFunc}) + if err != nil { + return nil, fmt.Errorf("failed to add %s index to ResourceSlice informer: %w", driverPoolDeviceIndexName, err) + } + // KubeClient is not always set in unit tests. + if opts.KubeClient != nil { + t.broadcaster = record.NewBroadcaster(record.WithContext(ctx)) + t.broadcaster.StartLogging(klog.Infof) + t.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: opts.KubeClient.CoreV1().Events("")}) + t.recorder = t.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_slice_tracker"}) + } + + return t, nil +} + +// initInformers adds event handlers to a tracker constructed with newTracker. +func (t *Tracker) initInformers(ctx context.Context) error { + var err error + + sliceHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: t.resourceSliceAdd(ctx), + UpdateFunc: t.resourceSliceUpdate(ctx), + DeleteFunc: t.resourceSliceDelete(ctx), + } + t.resourceSlicesHandle, err = t.resourceSlices.AddEventHandler(sliceHandler) + if err != nil { + return fmt.Errorf("add event handler for ResourceSlices: %w", err) + } + + taintHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: t.deviceTaintAdd(ctx), + UpdateFunc: t.deviceTaintUpdate(ctx), + DeleteFunc: t.deviceTaintDelete(ctx), + } + t.deviceTaintsHandle, err = t.deviceTaints.AddEventHandler(taintHandler) + if err != nil { + return fmt.Errorf("add event handler for DeviceTaintRules: %w", err) + } + + classHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: t.deviceClassAdd(ctx), + UpdateFunc: t.deviceClassUpdate(ctx), + DeleteFunc: t.deviceClassDelete(ctx), + } + t.deviceClassesHandle, err = t.deviceClasses.AddEventHandler(classHandler) + if err != nil { + return fmt.Errorf("add event handler for DeviceClasses: %w", err) + } + + return nil +} + +// HasSynced returns true if the tracker is done with processing all +// currently existing input objects. Adding a new event handler at that +// point is possible and will emit events with up-to-date ResourceSlice +// objects. +func (t *Tracker) HasSynced() bool { + if !t.enableDeviceTaints { + return t.resourceSlices.HasSynced() + } + + if t.resourceSlicesHandle != nil && !t.resourceSlicesHandle.HasSynced() { + return false + } + if t.deviceTaintsHandle != nil && !t.deviceTaintsHandle.HasSynced() { + return false + } + if t.deviceClassesHandle != nil && !t.deviceClassesHandle.HasSynced() { + return false + } + + return true +} + +// Stop ends all background activity and blocks until that shutdown is complete. +func (t *Tracker) Stop() { + if !t.enableDeviceTaints { + return + } + + if t.broadcaster != nil { + t.broadcaster.Shutdown() + } + _ = t.resourceSlices.RemoveEventHandler(t.resourceSlicesHandle) + _ = t.deviceTaints.RemoveEventHandler(t.deviceTaintsHandle) + _ = t.deviceClasses.RemoveEventHandler(t.deviceClassesHandle) +} + +// ListPatchedResourceSlices returns all ResourceSlices in the cluster with +// modifications from DeviceTaints applied. +func (t *Tracker) ListPatchedResourceSlices() ([]*resourceapi.ResourceSlice, error) { + if !t.enableDeviceTaints { + return t.resourceSliceLister.List(labels.Everything()) + } + + return typedSlice[*resourceapi.ResourceSlice](t.patchedResourceSlices.List()), nil +} + +// AddEventHandler adds an event handler to the tracker. Events to a +// single handler are delivered sequentially, but there is no +// coordination between different handlers. A handler may use the +// tracker. +// +// The return value can be used to wait for cache synchronization. +// All currently know ResourceSlices get delivered via Add events +// before this method returns. +func (t *Tracker) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { + if !t.enableDeviceTaints { + return t.resourceSlices.AddEventHandler(handler) + } + + defer t.emitEvents() + t.rwMutex.Lock() + defer t.rwMutex.Unlock() + + t.eventHandlers = append(t.eventHandlers, handler) + allObjs, _ := t.ListPatchedResourceSlices() + for _, obj := range allObjs { + t.eventQueue.Push(func() { + handler.OnAdd(obj, true) + }) + } + + // The tracker itself provides HasSynced for all registered event handlers. + // We don't support removal, so returning the same handle here for all + // of them is fine. + return t, nil +} + +// emitEvents delivers all pending events that are in the queue, in the order +// in which they were stored there (FIFO). +func (t *Tracker) emitEvents() { + for { + t.rwMutex.Lock() + deliver, ok := t.eventQueue.Pop() + t.rwMutex.Unlock() + + if !ok { + return + } + func() { + defer utilruntime.HandleCrash() + deliver() + }() + } +} + +// pushEvent ensures that all currently registered event handlers get +// notified about a change when the caller starts delivering +// those with emitEvents. +// +// For a delete event, newObj is nil. For an add, oldObj is nil. +// An update has both as non-nil. +func (t *Tracker) pushEvent(oldObj, newObj any) { + t.rwMutex.Lock() + defer t.rwMutex.Unlock() + for _, handler := range t.eventHandlers { + handler := handler + if oldObj == nil { + t.eventQueue.Push(func() { + handler.OnAdd(newObj, false) + }) + } else if newObj == nil { + t.eventQueue.Push(func() { + handler.OnDelete(oldObj) + }) + } else { + t.eventQueue.Push(func() { + handler.OnUpdate(oldObj, newObj) + }) + } + } +} + +func sliceDriverPoolDeviceIndexFunc(obj any) ([]string, error) { + slice := obj.(*resourceapi.ResourceSlice) + drivers := []string{ + anyDriver, + slice.Spec.Driver, + } + pools := []string{ + anyPool, + slice.Spec.Pool.Name, + } + indexValues := make([]string, 0, len(drivers)*len(pools)*(1+len(slice.Spec.Devices))) + for _, driver := range drivers { + for _, pool := range pools { + indexValues = append(indexValues, deviceID(driver, pool, anyDevice)) + for _, device := range slice.Spec.Devices { + indexValues = append(indexValues, deviceID(driver, pool, device.Name)) + } + } + } + return indexValues, nil +} + +func driverPoolDeviceIndexPatchKey(patch *resourcealphaapi.DeviceTaintRule) string { + deviceSelector := ptr.Deref(patch.Spec.DeviceSelector, resourcealphaapi.DeviceTaintSelector{}) + driverKey := ptr.Deref(deviceSelector.Driver, anyDriver) + poolKey := ptr.Deref(deviceSelector.Pool, anyPool) + deviceKey := ptr.Deref(deviceSelector.Device, anyDevice) + return deviceID(driverKey, poolKey, deviceKey) +} + +func (t *Tracker) sliceNamesForPatch(ctx context.Context, patch *resourcealphaapi.DeviceTaintRule) []string { + patchKey := driverPoolDeviceIndexPatchKey(patch) + sliceNames, err := t.resourceSlices.GetIndexer().IndexKeys(driverPoolDeviceIndexName, patchKey) + if err != nil { + t.handleError(ctx, err, "failed listing ResourceSlices for driver/pool/device key", "key", patchKey) + return nil + } + return sliceNames +} + +func (t *Tracker) resourceSliceAdd(ctx context.Context) func(obj any) { + logger := klog.FromContext(ctx) + return func(obj any) { + slice, ok := obj.(*resourceapi.ResourceSlice) + if !ok { + return + } + logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice)) + t.syncSlice(ctx, slice.Name, true) + } +} + +func (t *Tracker) resourceSliceUpdate(ctx context.Context) func(oldObj, newObj any) { + logger := klog.FromContext(ctx) + return func(oldObj, newObj any) { + oldSlice, ok := oldObj.(*resourceapi.ResourceSlice) + if !ok { + return + } + newSlice, ok := newObj.(*resourceapi.ResourceSlice) + if !ok { + return + } + if loggerV := logger.V(6); loggerV.Enabled() { + // While debugging, one needs a full dump of the objects for context *and* + // a diff because otherwise small changes would be hard to spot. + loggerV.Info("ResourceSlice update", "slice", klog.Format(oldSlice), "oldSlice", klog.Format(newSlice), "diff", cmp.Diff(oldSlice, newSlice)) + } else { + logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice)) + } + t.syncSlice(ctx, newSlice.Name, true) + } +} + +func (t *Tracker) resourceSliceDelete(ctx context.Context) func(obj any) { + logger := klog.FromContext(ctx) + return func(obj any) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + slice, ok := obj.(*resourceapi.ResourceSlice) + if !ok { + return + } + logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice)) + t.syncSlice(ctx, slice.Name, true) + } +} + +func (t *Tracker) deviceTaintAdd(ctx context.Context) func(obj any) { + logger := klog.FromContext(ctx) + return func(obj any) { + patch, ok := obj.(*resourcealphaapi.DeviceTaintRule) + if !ok { + return + } + logger.V(5).Info("DeviceTaintRule add", "patch", klog.KObj(patch)) + for _, sliceName := range t.sliceNamesForPatch(ctx, patch) { + t.syncSlice(ctx, sliceName, false) + } + } +} + +func (t *Tracker) deviceTaintUpdate(ctx context.Context) func(oldObj, newObj any) { + logger := klog.FromContext(ctx) + return func(oldObj, newObj any) { + oldPatch, ok := oldObj.(*resourcealphaapi.DeviceTaintRule) + if !ok { + return + } + newPatch, ok := newObj.(*resourcealphaapi.DeviceTaintRule) + if !ok { + return + } + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("DeviceTaintRule update", "patch", klog.KObj(newPatch), "diff", cmp.Diff(oldPatch, newPatch)) + } else { + logger.V(5).Info("DeviceTaintRule update", "patch", klog.KObj(newPatch)) + } + + // Slices that matched the old patch may need to be updated, in + // case they no longer match the new patch and need to have the + // patch's changes reverted. + slicesToSync := sets.New[string]() + slicesToSync.Insert(t.sliceNamesForPatch(ctx, oldPatch)...) + slicesToSync.Insert(t.sliceNamesForPatch(ctx, newPatch)...) + for _, sliceName := range slicesToSync.UnsortedList() { + t.syncSlice(ctx, sliceName, false) + } + } +} + +func (t *Tracker) deviceTaintDelete(ctx context.Context) func(obj any) { + logger := klog.FromContext(ctx) + return func(obj any) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + patch, ok := obj.(*resourcealphaapi.DeviceTaintRule) + if !ok { + return + } + logger.V(5).Info("DeviceTaintRule delete", "patch", klog.KObj(patch)) + for _, sliceName := range t.sliceNamesForPatch(ctx, patch) { + t.syncSlice(ctx, sliceName, false) + } + } +} + +func (t *Tracker) deviceClassAdd(ctx context.Context) func(obj any) { + logger := klog.FromContext(ctx) + return func(obj any) { + class, ok := obj.(*resourceapi.DeviceClass) + if !ok { + return + } + logger.V(5).Info("DeviceClass add", "class", klog.KObj(class)) + for _, sliceName := range t.resourceSlices.GetIndexer().ListKeys() { + t.syncSlice(ctx, sliceName, false) + } + } +} + +func (t *Tracker) deviceClassUpdate(ctx context.Context) func(oldObj, newObj any) { + logger := klog.FromContext(ctx) + return func(oldObj, newObj any) { + oldClass, ok := oldObj.(*resourceapi.DeviceClass) + if !ok { + return + } + newClass, ok := newObj.(*resourceapi.DeviceClass) + if !ok { + return + } + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("DeviceClass update", "class", klog.KObj(newClass), "diff", cmp.Diff(oldClass, newClass)) + } else { + logger.V(5).Info("DeviceClass update", "class", klog.KObj(newClass)) + } + for _, sliceName := range t.resourceSlices.GetIndexer().ListKeys() { + t.syncSlice(ctx, sliceName, false) + } + } +} + +func (t *Tracker) deviceClassDelete(ctx context.Context) func(obj any) { + logger := klog.FromContext(ctx) + return func(obj any) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + class, ok := obj.(*resourceapi.ResourceSlice) + if !ok { + return + } + logger.V(5).Info("DeviceClass delete", "class", klog.KObj(class)) + for _, sliceName := range t.resourceSlices.GetIndexer().ListKeys() { + t.syncSlice(ctx, sliceName, false) + } + } +} + +// syncSlice updates the slice with the given name, applying +// DeviceTaints that match. sendEvent is used to force the Tracker +// to publish an event for listeners added by [Tracker.AddEventHandler]. It +// is set when syncSlice is triggered by a ResourceSlice event to avoid +// doing costly DeepEqual comparisons where possible. +func (t *Tracker) syncSlice(ctx context.Context, name string, sendEvent bool) { + defer t.emitEvents() + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "resourceslice", name) + ctx = klog.NewContext(ctx, logger) + logger.V(5).Info("syncing ResourceSlice") + + obj, sliceExists, err := t.resourceSlices.GetIndexer().GetByKey(name) + if err != nil { + t.handleError(ctx, err, "failed to lookup existing resource slice", "resourceslice", name) + return + } + oldPatchedObj, oldSliceExists, err := t.patchedResourceSlices.GetByKey(name) + if err != nil { + t.handleError(ctx, err, "failed to lookup cached patched resource slice", "resourceslice", name) + return + } + if !sliceExists { + err := t.patchedResourceSlices.Delete(oldPatchedObj) + if err != nil { + t.handleError(ctx, err, "failed to delete cached patched resource slice", "resourceslice", name) + return + } + t.pushEvent(oldPatchedObj, nil) + logger.V(5).Info("patched ResourceSlice deleted") + return + } + var oldPatchedSlice *resourceapi.ResourceSlice + if oldSliceExists { + var ok bool + oldPatchedSlice, ok = oldPatchedObj.(*resourceapi.ResourceSlice) + if !ok { + t.handleError(ctx, errors.New("invalid type in resource slice cache"), "expectedType", fmt.Sprintf("%T", (*resourceapi.ResourceSlice)(nil)), "gotType", fmt.Sprintf("%T", oldPatchedObj)) + return + } + } + slice, ok := obj.(*resourceapi.ResourceSlice) + if !ok { + t.handleError(ctx, errors.New("invalid type in resource slice cache"), fmt.Sprintf("expected type to be %T, got %T", (*resourceapi.ResourceSlice)(nil), obj)) + return + } + + patches := typedSlice[*resourcealphaapi.DeviceTaintRule](t.deviceTaints.GetIndexer().List()) + patchedSlice, err := t.applyPatches(ctx, slice, patches) + if err != nil { + t.handleError(ctx, err, "failed to apply patches to ResourceSlice", "resourceslice", klog.KObj(slice)) + return + } + + // When syncSlice is triggered by something other than a ResourceSlice + // event, only the device attributes and capacity might change. We + // deliberately avoid any costly DeepEqual-style comparisons here. + if !sendEvent && oldPatchedSlice != nil { + for i := range patchedSlice.Spec.Devices { + oldDevice := oldPatchedSlice.Spec.Devices[i] + newDevice := patchedSlice.Spec.Devices[i] + sendEvent = sendEvent || + !slices.EqualFunc(getTaints(oldDevice), getTaints(newDevice), taintsEqual) + } + } + + err = t.patchedResourceSlices.Add(patchedSlice) + if err != nil { + t.handleError(ctx, err, "failed to add patched resource slice to cache", "resourceslice", klog.KObj(patchedSlice)) + return + } + if sendEvent { + t.pushEvent(oldPatchedObj, patchedSlice) + } + + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("ResourceSlice synced", "diff", cmp.Diff(oldPatchedObj, patchedSlice)) + } else { + logger.V(5).Info("ResourceSlice synced") + } +} + +func (t *Tracker) applyPatches(ctx context.Context, slice *resourceapi.ResourceSlice, taintRules []*resourcealphaapi.DeviceTaintRule) (*resourceapi.ResourceSlice, error) { + logger := klog.FromContext(ctx) + + // slice will be DeepCopied just-in-time, only when necessary. + patchedSlice := slice + + for _, taintRule := range taintRules { + logger := klog.LoggerWithValues(logger, "deviceTaintRule", klog.KObj(taintRule)) + logger.V(6).Info("processing DeviceTaintRule") + + deviceSelector := taintRule.Spec.DeviceSelector + var deviceClassExprs []cel.CompilationResult + var selectorExprs []cel.CompilationResult + var deviceName *string + if deviceSelector != nil { + if deviceSelector.Driver != nil && *deviceSelector.Driver != slice.Spec.Driver { + logger.V(7).Info("DeviceTaintRule does not apply, mismatched driver", "sliceDriver", slice.Spec.Driver, "taintDriver", *deviceSelector.Driver) + continue + } + if deviceSelector.Pool != nil && *deviceSelector.Pool != slice.Spec.Pool.Name { + logger.V(7).Info("DeviceTaintRule does not apply, mismatched pool", "slicePool", slice.Spec.Pool.Name, "taintPool", *deviceSelector.Pool) + continue + } + deviceName = deviceSelector.Device + if deviceSelector.DeviceClassName != nil { + logger := logger.WithValues("deviceClassName", *deviceSelector.DeviceClassName) + classObj, exists, err := t.deviceClasses.GetIndexer().GetByKey(*deviceSelector.DeviceClassName) + if err != nil { + return nil, fmt.Errorf("failed to get device class %s for DeviceTaintRule %s", *deviceSelector.DeviceClassName, taintRule.Name) + } + if !exists { + logger.V(7).Info("DeviceTaintRule does not apply, DeviceClass does not exist") + continue + } + class := classObj.(*resourceapi.DeviceClass) + for _, selector := range class.Spec.Selectors { + if selector.CEL != nil { + expr := t.celCache.GetOrCompile(selector.CEL.Expression) + deviceClassExprs = append(deviceClassExprs, expr) + } + } + } + for _, selector := range deviceSelector.Selectors { + if selector.CEL != nil { + expr := t.celCache.GetOrCompile(selector.CEL.Expression) + selectorExprs = append(selectorExprs, expr) + } + } + } + devices: + for dIndex, device := range slice.Spec.Devices { + deviceID := deviceID(slice.Spec.Driver, slice.Spec.Pool.Name, device.Name) + logger := logger.WithValues("device", deviceID) + + if deviceName != nil && *deviceName != device.Name { + logger.V(7).Info("DeviceTaintRule does not apply, mismatched device", "sliceDevice", device.Name, "taintDevice", *deviceSelector.Device) + continue + } + + deviceAttributes := getAttributes(device) + deviceCapacity := getCapacity(device) + + for i, expr := range deviceClassExprs { + if expr.Error != nil { + // Could happen if some future apiserver accepted some + // future expression and then got downgraded. Normally + // the "stored expression" mechanism prevents that, but + // this code here might be more than one release older + // than the cluster it runs in. + return nil, fmt.Errorf("DeviceTaintRule %s: class %s: selector #%d: CEL compile error: %w", taintRule.Name, *deviceSelector.DeviceClassName, i, expr.Error) + } + matches, details, err := expr.DeviceMatches(ctx, cel.Device{Driver: slice.Spec.Driver, Attributes: deviceAttributes, Capacity: deviceCapacity}) + logger.V(7).Info("CEL result", "class", *deviceSelector.DeviceClassName, "selector", i, "expression", expr.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err) + if err != nil { + continue devices + } + if !matches { + continue devices + } + } + + for i, expr := range selectorExprs { + if expr.Error != nil { + // Could happen if some future apiserver accepted some + // future expression and then got downgraded. Normally + // the "stored expression" mechanism prevents that, but + // this code here might be more than one release older + // than the cluster it runs in. + return nil, fmt.Errorf("DeviceTaintRule %s: selector #%d: CEL compile error: %w", taintRule.Name, i, expr.Error) + } + matches, details, err := expr.DeviceMatches(ctx, cel.Device{Driver: slice.Spec.Driver, Attributes: deviceAttributes, Capacity: deviceCapacity}) + logger.V(7).Info("CEL result", "selector", i, "expression", expr.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err) + if err != nil { + if t.recorder != nil { + t.recorder.Eventf(taintRule, v1.EventTypeWarning, "CELRuntimeError", "selector #%d: runtime error: %v", i, err) + } + continue devices + } + if !matches { + continue devices + } + } + + logger.V(6).Info("applying matching DeviceTaintRule") + + // TODO: remove conversion once taint is already in the right API package. + ta := resourceapi.DeviceTaint{ + Key: taintRule.Spec.Taint.Key, + Value: taintRule.Spec.Taint.Value, + Effect: resourceapi.DeviceTaintEffect(taintRule.Spec.Taint.Effect), + TimeAdded: taintRule.Spec.Taint.TimeAdded, + } + + if patchedSlice == slice { + patchedSlice = slice.DeepCopy() + } + + appendTaint(&patchedSlice.Spec.Devices[dIndex], ta) + } + } + + return patchedSlice, nil +} + +func getAttributes(device resourceapi.Device) map[resourceapi.QualifiedName]resourceapi.DeviceAttribute { + if device.Basic != nil { + return device.Basic.Attributes + } + return nil +} + +func getCapacity(device resourceapi.Device) map[resourceapi.QualifiedName]resourceapi.DeviceCapacity { + if device.Basic != nil { + return device.Basic.Capacity + } + return nil +} + +func getTaints(device resourceapi.Device) []resourceapi.DeviceTaint { + if device.Basic != nil { + return device.Basic.Taints + } + return nil +} + +func appendTaint(device *resourceapi.Device, taint resourceapi.DeviceTaint) { + if device.Basic != nil { + device.Basic.Taints = append(device.Basic.Taints, taint) + return + } +} + +func taintsEqual(a, b resourceapi.DeviceTaint) bool { + return a.Key == b.Key && + a.Effect == b.Effect && + a.Value == b.Value && + a.TimeAdded.Equal(b.TimeAdded) // Equal deals with nil. +} + +func deviceID(driver, pool, device string) string { + return driver + "/" + pool + "/" + device +} + +func typedSlice[T any](objs []any) []T { + if objs == nil { + return nil + } + typed := make([]T, 0, len(objs)) + for _, obj := range objs { + typed = append(typed, obj.(T)) + } + return typed +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker_test.go new file mode 100644 index 00000000000..1e5feca7cea --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker_test.go @@ -0,0 +1,1764 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tracker + +import ( + stdcmp "cmp" + "context" + "slices" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + resourcealphaapi "k8s.io/api/resource/v1alpha3" + resourceapi "k8s.io/api/resource/v1beta1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" + _ "k8s.io/klog/v2/ktesting/init" + "k8s.io/utils/ptr" +) + +type handlerEventType string + +const ( + handlerEventAdd handlerEventType = "add" + handlerEventUpdate handlerEventType = "update" + handlerEventDelete handlerEventType = "delete" +) + +type handlerEvent struct { + event handlerEventType + oldObj *resourceapi.ResourceSlice + newObj *resourceapi.ResourceSlice +} + +type inputEventGenerator struct { + addResourceSlice func(slice *resourceapi.ResourceSlice) + deleteResourceSlice func(name string) + addDeviceTaintRule func(taintRule *resourcealphaapi.DeviceTaintRule) + deleteDeviceTaintRule func(name string) + addDeviceClass func(class *resourceapi.DeviceClass) + deleteDeviceClass func(name string) +} + +func inputEventGeneratorForTest(ctx context.Context, t *testing.T, tracker *Tracker) inputEventGenerator { + return inputEventGenerator{ + addResourceSlice: func(slice *resourceapi.ResourceSlice) { + oldObj, exists, err := tracker.resourceSlices.GetIndexer().Get(slice) + require.NoError(t, err) + err = tracker.resourceSlices.GetIndexer().Add(slice) + require.NoError(t, err) + if !exists { + tracker.resourceSliceAdd(ctx)(slice) + } else if !apiequality.Semantic.DeepEqual(oldObj, slice) { + tracker.resourceSliceUpdate(ctx)(oldObj, slice) + } + }, + deleteResourceSlice: func(name string) { + oldObj, exists, err := tracker.resourceSlices.GetIndexer().GetByKey(name) + require.NoError(t, err) + require.True(t, exists, "deleting resource slice that was never created") + err = tracker.resourceSlices.GetIndexer().Delete(oldObj) + require.NoError(t, err) + tracker.resourceSliceDelete(ctx)(oldObj) + }, + addDeviceTaintRule: func(taintRule *resourcealphaapi.DeviceTaintRule) { + oldObj, exists, err := tracker.deviceTaints.GetIndexer().Get(taintRule) + require.NoError(t, err) + err = tracker.deviceTaints.GetIndexer().Add(taintRule) + require.NoError(t, err) + if !exists { + tracker.deviceTaintAdd(ctx)(taintRule) + } else if !apiequality.Semantic.DeepEqual(oldObj, taintRule) { + tracker.deviceTaintUpdate(ctx)(oldObj, taintRule) + } + }, + deleteDeviceTaintRule: func(name string) { + oldObj, exists, err := tracker.deviceTaints.GetIndexer().GetByKey(name) + require.NoError(t, err) + require.True(t, exists, "deleting DeviceTaintRule that was never created") + err = tracker.deviceTaints.GetIndexer().Delete(oldObj) + require.NoError(t, err) + tracker.deviceTaintDelete(ctx)(oldObj) + }, + addDeviceClass: func(class *resourceapi.DeviceClass) { + oldObj, exists, err := tracker.deviceClasses.GetIndexer().Get(class) + require.NoError(t, err) + err = tracker.deviceClasses.GetIndexer().Add(class) + require.NoError(t, err) + if !exists { + tracker.deviceClassAdd(ctx)(class) + } else if !apiequality.Semantic.DeepEqual(oldObj, class) { + tracker.deviceClassUpdate(ctx)(oldObj, class) + } + }, + deleteDeviceClass: func(name string) { + oldObj, exists, err := tracker.deviceClasses.GetIndexer().GetByKey(name) + require.NoError(t, err) + require.True(t, exists, "deleting device class that was never created") + err = tracker.deviceClasses.GetIndexer().Delete(oldObj) + require.NoError(t, err) + tracker.deviceClassDelete(ctx)(oldObj) + }, + } +} + +func TestListPatchedResourceSlices(t *testing.T) { + now, _ := time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + + tests := map[string]struct { + deviceTaintsDisabled bool + inputEvents func(event inputEventGenerator) + expectedPatchedSlices []*resourceapi.ResourceSlice + expectHandlerEvents func(t *testing.T, events []handlerEvent) + expectEvents func(t *assert.CollectT, events *v1.EventList) + expectUnhandledErrors func(t *testing.T, errs []error) + }{ + "add-slices-no-patches": { + inputEvents: func(event inputEventGenerator) { + event.addResourceSlice(&resourceapi.ResourceSlice{ObjectMeta: metav1.ObjectMeta{Name: "s1"}}) + event.addResourceSlice(&resourceapi.ResourceSlice{ObjectMeta: metav1.ObjectMeta{Name: "s2"}}) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + {ObjectMeta: metav1.ObjectMeta{Name: "s1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "s2"}}, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 2) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "s1", events[0].newObj.Name) + assert.Equal(t, handlerEventAdd, events[1].event) + assert.Equal(t, "s2", events[1].newObj.Name) + }, + }, + "update-slices-no-patches": { + inputEvents: func(event inputEventGenerator) { + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "s1", + }, + Spec: resourceapi.ResourceSliceSpec{ + // no devices + Devices: nil, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "s2", + }, + Spec: resourceapi.ResourceSliceSpec{ + // no devices + Devices: nil, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ObjectMeta: metav1.ObjectMeta{Name: "no-change"}}) + + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "s1", + }, + Spec: resourceapi.ResourceSliceSpec{ + // devices! + Devices: []resourceapi.Device{ + {Basic: &resourceapi.BasicDevice{}}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "s2", + }, + Spec: resourceapi.ResourceSliceSpec{ + // devices! + Devices: []resourceapi.Device{ + {Basic: &resourceapi.BasicDevice{}}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ObjectMeta: metav1.ObjectMeta{Name: "no-change"}}) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s1", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: []resourceapi.Device{ + {Basic: &resourceapi.BasicDevice{}}, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "s2", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: []resourceapi.Device{ + {Basic: &resourceapi.BasicDevice{}}, + }, + }, + }, + {ObjectMeta: metav1.ObjectMeta{Name: "no-change"}}, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 5) { + return + } + // The first events are adds. + assert.Equal(t, handlerEventUpdate, events[3].event) + assert.Equal(t, "s1", events[3].newObj.Name) + assert.Equal(t, "s1", events[3].oldObj.Name) + assert.Nil(t, events[3].oldObj.Spec.Devices) + assert.NotNil(t, events[3].newObj.Spec.Devices) + assert.Equal(t, handlerEventUpdate, events[4].event) + assert.Equal(t, "s2", events[4].newObj.Name) + assert.Equal(t, "s2", events[4].oldObj.Name) + assert.Nil(t, events[4].oldObj.Spec.Devices) + assert.NotNil(t, events[4].newObj.Spec.Devices) + }, + }, + "delete-slices": { + inputEvents: func(event inputEventGenerator) { + event.addResourceSlice(&resourceapi.ResourceSlice{ObjectMeta: metav1.ObjectMeta{Name: "s1"}}) + event.addResourceSlice(&resourceapi.ResourceSlice{ObjectMeta: metav1.ObjectMeta{Name: "s2"}}) + event.addResourceSlice(&resourceapi.ResourceSlice{ObjectMeta: metav1.ObjectMeta{Name: "keep-me"}}) + event.deleteResourceSlice("s1") + event.deleteResourceSlice("s2") + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + {ObjectMeta: metav1.ObjectMeta{Name: "keep-me"}}, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 5) { + return + } + // The first events are adds. + assert.Equal(t, handlerEventDelete, events[3].event) + assert.Equal(t, "s1", events[3].oldObj.Name) + assert.Equal(t, handlerEventDelete, events[4].event) + assert.Equal(t, "s2", events[4].oldObj.Name) + }, + }, + "patch-all-slices": { + inputEvents: func(event inputEventGenerator) { + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "all-slices", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: nil, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 2) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + assert.Equal(t, handlerEventUpdate, events[1].event) + assert.Equal(t, "slice", events[1].newObj.Name) + }, + }, + "update-patch": { + inputEvents: func(event inputEventGenerator) { + taintRule := &resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "taintRule", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Pool: ptr.To("pool-1"), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + } + event.addDeviceTaintRule(taintRule.DeepCopy()) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-1", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool-1", + }, + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-2", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool-2", + }, + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + taintRule.Spec.DeviceSelector.Pool = ptr.To("pool-2") + event.addDeviceTaintRule(taintRule) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-1", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool-1", + }, + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-2", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool-2", + }, + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 4) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice-1", events[0].newObj.Name) + assert.Equal(t, handlerEventAdd, events[1].event) + assert.Equal(t, "slice-2", events[1].newObj.Name) + + assert.Equal(t, handlerEventUpdate, events[2].event) + assert.Equal(t, handlerEventUpdate, events[3].event) + assert.ElementsMatch(t, []string{"slice-1", "slice-2"}, []string{events[2].newObj.Name, events[3].newObj.Name}) + }, + }, + "merge-taints": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "merge", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: nil, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint2", + Value: "tainted2", + Effect: resourceapi.DeviceTaintEffectNoSchedule, + }}, + }, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{ + { + Key: "example.com/taint2", + Value: "tainted2", + Effect: resourceapi.DeviceTaintEffectNoSchedule, + }, + { + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 1) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + }, + }, + "add-taint-for-driver": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "driver", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Driver: ptr.To("test.example.com"), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-driver", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "wrong.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-driver", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "wrong.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 2) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + assert.Equal(t, handlerEventAdd, events[1].event) + assert.Equal(t, "wrong-driver", events[1].newObj.Name) + }, + }, + "add-taint-for-pool": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pool", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Pool: ptr.To("pool"), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool", + }, + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-pool", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "other", + }, + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool", + }, + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-pool", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "other", + }, + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 2) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + assert.Equal(t, handlerEventAdd, events[1].event) + assert.Equal(t, "wrong-pool", events[1].newObj.Name) + }, + }, + "add-taint-for-device": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "device", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Device: ptr.To("device"), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool", + }, + Devices: []resourceapi.Device{ + { + Name: "device", + Basic: &resourceapi.BasicDevice{}, + }, + { + Name: "wrong-device", + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool", + }, + Devices: []resourceapi.Device{ + { + Name: "device", + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + { + Name: "wrong-device", + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 1) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + }, + }, + "add-attribute-for-selector": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "selector", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Selectors: []resourcealphaapi.DeviceSelector{ + { + CEL: &resourcealphaapi.CELDeviceSelector{ + Expression: `device.driver == "test.example.com"`, + }, + }, + }, + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-driver", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "wrong.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-driver", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "wrong.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 2) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + assert.Equal(t, handlerEventAdd, events[1].event) + assert.Equal(t, "wrong-driver", events[1].newObj.Name) + }, + }, + "selector-does-not-match": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "selector", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Selectors: []resourcealphaapi.DeviceSelector{ + { + CEL: &resourcealphaapi.CELDeviceSelector{ + Expression: `true`, + }, + }, + { + CEL: &resourcealphaapi.CELDeviceSelector{ + Expression: `false`, + }, + }, + { + CEL: &resourcealphaapi.CELDeviceSelector{ + Expression: `true`, + }, + }, + }, + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 1) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + }, + }, + "runtime-CEL-errors-skip-devices": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "selector", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Selectors: []resourcealphaapi.DeviceSelector{ + { + CEL: &resourcealphaapi.CELDeviceSelector{ + Expression: `device.attributes["test.example.com"].deviceAttr`, + }, + }, + }, + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + }, + expectEvents: func(t *assert.CollectT, events *v1.EventList) { + if !assert.Len(t, events.Items, 1) { + return + } + assert.Equal(t, "selector", events.Items[0].InvolvedObject.Name) + assert.Equal(t, "CELRuntimeError", events.Items[0].Reason) + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 1) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + }, + }, + "invalid-CEL-expression-throws-error": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "selector", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Selectors: []resourcealphaapi.DeviceSelector{ + { + CEL: &resourcealphaapi.CELDeviceSelector{ + Expression: `invalid`, + }, + }, + }, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{}, + expectUnhandledErrors: func(t *testing.T, errs []error) { + if !assert.Len(t, errs, 1) { + return + } + assert.ErrorContains(t, errs[0], "CEL compile error") + }, + }, + "add-taint-for-device-class": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceClass(&resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "class.example.com", + }, + Spec: resourceapi.DeviceClassSpec{ + Selectors: []resourceapi.DeviceSelector{ + { + CEL: &resourceapi.CELDeviceSelector{ + Expression: `device.driver == "test.example.com"`, + }, + }, + }, + }, + }) + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "device-class", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + DeviceClassName: ptr.To("class.example.com"), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-driver", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "wrong.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-driver", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "wrong.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 2) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + assert.Equal(t, handlerEventAdd, events[1].event) + assert.Equal(t, "wrong-driver", events[1].newObj.Name) + }, + }, + "filter-all-criteria": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceClass(&resourceapi.DeviceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "class.example.com", + }, + Spec: resourceapi.DeviceClassSpec{ + Selectors: []resourceapi.DeviceSelector{ + { + CEL: &resourceapi.CELDeviceSelector{ + Expression: `device.driver == "test.example.com"`, + }, + }, + }, + }, + }) + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "all-criteria", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + DeviceClassName: ptr.To("class.example.com"), + Driver: ptr.To("test.example.com"), + Pool: ptr.To("pool"), + Device: ptr.To("device"), + Selectors: []resourcealphaapi.DeviceSelector{ + { + CEL: &resourcealphaapi.CELDeviceSelector{ + Expression: `true`, + }, + }, + }, + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Pool: resourceapi.ResourcePool{ + Name: "pool", + }, + Devices: []resourceapi.Device{ + { + Name: "device", + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + event.addResourceSlice(&resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-driver", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "wrong.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "slice", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "test.example.com", + Pool: resourceapi.ResourcePool{ + Name: "pool", + }, + Devices: []resourceapi.Device{ + { + Name: "device", + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "wrong-driver", + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: "wrong.example.com", + Devices: []resourceapi.Device{ + { + Basic: &resourceapi.BasicDevice{}, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 2) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "slice", events[0].newObj.Name) + assert.Equal(t, handlerEventAdd, events[1].event) + assert.Equal(t, "wrong-driver", events[1].newObj.Name) + }, + }, + "update-patched-slice": { + inputEvents: func(event inputEventGenerator) { + event.addDeviceTaintRule(&resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "all-slices", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Device: ptr.To("device-1"), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }) + oneDevice := []resourceapi.Device{ + {Name: "device-1", Basic: &resourceapi.BasicDevice{}}, + } + threeDevices := []resourceapi.Device{ + {Name: "device-0", Basic: &resourceapi.BasicDevice{}}, + {Name: "device-1", Basic: &resourceapi.BasicDevice{}}, + {Name: "device-2", Basic: &resourceapi.BasicDevice{}}, + } + devicesAdded := &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "devices-added", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: oneDevice, + }, + } + devicesRemoved := &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "devices-removed", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: threeDevices, + }, + } + event.addResourceSlice(devicesAdded.DeepCopy()) + devicesAdded.Spec.Devices = threeDevices + event.addResourceSlice(devicesAdded) + event.addResourceSlice(devicesRemoved.DeepCopy()) + devicesRemoved.Spec.Devices = oneDevice + event.addResourceSlice(devicesRemoved) + }, + expectedPatchedSlices: []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "devices-added", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: []resourceapi.Device{ + {Name: "device-0", Basic: &resourceapi.BasicDevice{}}, + { + Name: "device-1", + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + {Name: "device-2", Basic: &resourceapi.BasicDevice{}}, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "devices-removed", + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: []resourceapi.Device{ + { + Name: "device-1", + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }}, + }, + }, + }, + }, + }, + }, + expectHandlerEvents: func(t *testing.T, events []handlerEvent) { + if !assert.Len(t, events, 4) { + return + } + assert.Equal(t, handlerEventAdd, events[0].event) + assert.Equal(t, "devices-added", events[0].newObj.Name) + assert.Equal(t, handlerEventUpdate, events[1].event) + assert.Equal(t, "devices-added", events[1].newObj.Name) + assert.Equal(t, handlerEventAdd, events[2].event) + assert.Equal(t, "devices-removed", events[2].newObj.Name) + assert.Equal(t, handlerEventUpdate, events[3].event) + assert.Equal(t, "devices-removed", events[3].newObj.Name) + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + + kubeClient := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 10*time.Minute) + + var handlerEvents []handlerEvent + handler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + handlerEvents = append(handlerEvents, handlerEvent{event: handlerEventAdd, newObj: obj.(*resourceapi.ResourceSlice)}) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + handlerEvents = append(handlerEvents, handlerEvent{event: handlerEventUpdate, oldObj: oldObj.(*resourceapi.ResourceSlice), newObj: newObj.(*resourceapi.ResourceSlice)}) + }, + DeleteFunc: func(obj interface{}) { + handlerEvents = append(handlerEvents, handlerEvent{event: handlerEventDelete, oldObj: obj.(*resourceapi.ResourceSlice)}) + }, + } + + opts := Options{ + EnableDeviceTaints: !test.deviceTaintsDisabled, + SliceInformer: informerFactory.Resource().V1beta1().ResourceSlices(), + TaintInformer: informerFactory.Resource().V1alpha3().DeviceTaintRules(), + ClassInformer: informerFactory.Resource().V1beta1().DeviceClasses(), + KubeClient: kubeClient, + } + tracker, err := newTracker(ctx, opts) + require.NoError(t, err) + var unhandledErrors []error + tracker.handleError = func(_ context.Context, err error, _ string, _ ...any) { + unhandledErrors = append(unhandledErrors, err) + } + _, _ = tracker.AddEventHandler(handler) + + if test.inputEvents != nil { + test.inputEvents(inputEventGeneratorForTest(ctx, t, tracker)) + } + + expectHandlerEvents := test.expectHandlerEvents + if expectHandlerEvents == nil { + expectHandlerEvents = func(t *testing.T, events []handlerEvent) { + assert.Empty(t, events) + } + } + expectHandlerEvents(t, handlerEvents) + + expectUnhandledErrors := test.expectUnhandledErrors + if expectUnhandledErrors == nil { + expectUnhandledErrors = func(t *testing.T, errs []error) { + assert.Empty(t, errs) + } + } + expectUnhandledErrors(t, unhandledErrors) + + // Check ResourceSlices + patchedResourceSlices, err := tracker.ListPatchedResourceSlices() + require.NoError(t, err, "list patched resource slices") + sortResourceSlicesFunc := func(s1, s2 *resourceapi.ResourceSlice) int { + return stdcmp.Compare(s1.Name, s2.Name) + } + slices.SortFunc(test.expectedPatchedSlices, sortResourceSlicesFunc) + slices.SortFunc(patchedResourceSlices, sortResourceSlicesFunc) + assert.Equal(t, test.expectedPatchedSlices, patchedResourceSlices) + expectEvents := test.expectEvents + if expectEvents == nil { + expectEvents = func(t *assert.CollectT, events *v1.EventList) { + assert.Empty(t, events.Items) + } + } + // Events are generated asynchronously. While shutting down the event recorder will flush all + // pending events, it is not possible to determine when exactly that flush is complete. + assert.EventuallyWithT( + t, + func(t *assert.CollectT) { + events, err := kubeClient.CoreV1().Events("").List(ctx, metav1.ListOptions{}) + require.NoError(t, err, "list events") + expectEvents(t, events) + }, + 1*time.Second, + 10*time.Millisecond, + "did not observe expected events", + ) + }) + } +} + +func BenchmarkEventHandlers(b *testing.B) { + now := time.Now() + benchmarks := map[string]struct { + resourceSlices []*resourceapi.ResourceSlice + taintRules []*resourcealphaapi.DeviceTaintRule + loop func(ctx context.Context, b *testing.B, tracker *Tracker, resourceSlices []*resourceapi.ResourceSlice, taintRules []*resourcealphaapi.DeviceTaintRule, i int) + }{ + "resource-slice-add-no-taint-rules": { + resourceSlices: func() []*resourceapi.ResourceSlice { + resourceSlices := make([]*resourceapi.ResourceSlice, 1000) + for i := range resourceSlices { + resourceSlices[i] = &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-" + strconv.Itoa(i), + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: slices.Repeat([]resourceapi.Device{{Basic: &resourceapi.BasicDevice{}}}, 64), + }, + } + } + return resourceSlices + }(), + loop: func(ctx context.Context, b *testing.B, tracker *Tracker, resourceSlices []*resourceapi.ResourceSlice, _ []*resourcealphaapi.DeviceTaintRule, i int) { + tracker.resourceSliceAdd(ctx)(resourceSlices[i%len(resourceSlices)]) + }, + }, + "one-patch-to-many-slices-add-taint-rule": { + resourceSlices: func() []*resourceapi.ResourceSlice { + resourceSlices := make([]*resourceapi.ResourceSlice, 500) + for i := range resourceSlices { + resourceSlices[i] = &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-" + strconv.Itoa(i), + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: slices.Repeat([]resourceapi.Device{{Basic: &resourceapi.BasicDevice{}}}, 64), + }, + } + } + return resourceSlices + }(), + taintRules: []*resourcealphaapi.DeviceTaintRule{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "taintRule", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: nil, // all slices + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }, + }, + loop: func(ctx context.Context, b *testing.B, tracker *Tracker, resourceSlices []*resourceapi.ResourceSlice, taintRules []*resourcealphaapi.DeviceTaintRule, i int) { + tracker.deviceTaintAdd(ctx)(taintRules[i%len(taintRules)]) + }, + }, + "one-patch-to-many-slices-add-slice": { + resourceSlices: func() []*resourceapi.ResourceSlice { + resourceSlices := make([]*resourceapi.ResourceSlice, 500) + for i := range resourceSlices { + resourceSlices[i] = &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-" + strconv.Itoa(i), + }, + Spec: resourceapi.ResourceSliceSpec{ + Devices: slices.Repeat([]resourceapi.Device{{Basic: &resourceapi.BasicDevice{}}}, 64), + }, + } + } + return resourceSlices + }(), + taintRules: []*resourcealphaapi.DeviceTaintRule{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "taintRule", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: nil, // all slices + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }, + }, + loop: func(ctx context.Context, b *testing.B, tracker *Tracker, resourceSlices []*resourceapi.ResourceSlice, _ []*resourcealphaapi.DeviceTaintRule, i int) { + tracker.resourceSliceAdd(ctx)(resourceSlices[i%len(resourceSlices)]) + }, + }, + "one-patched-device-among-many-slices-add-taint-rule": { + resourceSlices: func() []*resourceapi.ResourceSlice { + nSlices := 500 + nDevices := 64 + resourceSlices := make([]*resourceapi.ResourceSlice, nSlices) + for i := range resourceSlices { + resourceSlices[i] = &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-" + strconv.Itoa(i), + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool-" + strconv.Itoa(i), + }, + Devices: func() []resourceapi.Device { + devices := make([]resourceapi.Device, nDevices) + for j := range devices { + devices[j] = resourceapi.Device{ + Name: "device-" + strconv.Itoa(j), + Basic: &resourceapi.BasicDevice{}, + } + } + return devices + }(), + }, + } + } + resourceSlices[nSlices/2].Spec.Devices[nDevices/2].Name = "patchme" + return resourceSlices + }(), + taintRules: []*resourcealphaapi.DeviceTaintRule{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "taintRule", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Device: ptr.To("patchme"), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }, + }, + loop: func(ctx context.Context, b *testing.B, tracker *Tracker, resourceSlices []*resourceapi.ResourceSlice, taintRules []*resourcealphaapi.DeviceTaintRule, i int) { + tracker.deviceTaintAdd(ctx)(taintRules[i%len(taintRules)]) + }, + }, + "one-patched-device-among-many-slices-add-slice": { + resourceSlices: func() []*resourceapi.ResourceSlice { + resourceSlices := make([]*resourceapi.ResourceSlice, 500) + for i := range resourceSlices { + resourceSlices[i] = &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-" + strconv.Itoa(i), + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool-" + strconv.Itoa(i), + }, + Devices: func() []resourceapi.Device { + nDevices := 64 + devices := slices.Repeat([]resourceapi.Device{{Basic: &resourceapi.BasicDevice{}}}, nDevices) + devices[nDevices/2].Name = "patchme" + return devices + }(), + }, + } + } + return resourceSlices + }(), + taintRules: []*resourcealphaapi.DeviceTaintRule{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "patch", + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Pool: ptr.To("pool-250"), + Device: ptr.To("patchme"), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + }, + }, + loop: func(ctx context.Context, b *testing.B, tracker *Tracker, resourceSlices []*resourceapi.ResourceSlice, patches []*resourcealphaapi.DeviceTaintRule, i int) { + tracker.resourceSliceAdd(ctx)(resourceSlices[250]) // the slice affected by the patch + }, + }, + "one-patch-for-each-of-many-slices-add-taint-rule": { + resourceSlices: func() []*resourceapi.ResourceSlice { + resourceSlices := make([]*resourceapi.ResourceSlice, 500) + for i := range resourceSlices { + resourceSlices[i] = &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "slice-" + strconv.Itoa(i), + }, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{ + Name: "pool-" + strconv.Itoa(i), + }, + Devices: slices.Repeat([]resourceapi.Device{{Basic: &resourceapi.BasicDevice{}}}, 64), + }, + } + } + return resourceSlices + }(), + taintRules: func() []*resourcealphaapi.DeviceTaintRule { + patches := make([]*resourcealphaapi.DeviceTaintRule, 500) + for i := range patches { + patches[i] = &resourcealphaapi.DeviceTaintRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "taint-rule-" + strconv.Itoa(i), + }, + Spec: resourcealphaapi.DeviceTaintRuleSpec{ + DeviceSelector: &resourcealphaapi.DeviceTaintSelector{ + Pool: ptr.To("pool-" + strconv.Itoa(i)), + }, + Taint: resourcealphaapi.DeviceTaint{ + Key: "example.com/taint", + Value: "tainted", + Effect: resourcealphaapi.DeviceTaintEffectNoExecute, + TimeAdded: &metav1.Time{Time: now}, + }, + }, + } + } + return patches + }(), + loop: func(ctx context.Context, b *testing.B, tracker *Tracker, resourceSlices []*resourceapi.ResourceSlice, taintRules []*resourcealphaapi.DeviceTaintRule, i int) { + tracker.deviceTaintAdd(ctx)(taintRules[i%len(taintRules)]) + }, + }, + } + + newBenchTracker := func(ctx context.Context) *Tracker { + kubeClient := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 10*time.Minute) + opts := Options{ + EnableDeviceTaints: true, + SliceInformer: informerFactory.Resource().V1beta1().ResourceSlices(), + TaintInformer: informerFactory.Resource().V1alpha3().DeviceTaintRules(), + ClassInformer: informerFactory.Resource().V1beta1().DeviceClasses(), + KubeClient: kubeClient, + } + tracker, err := newTracker(ctx, opts) + require.NoError(b, err) + tracker.handleError = func(_ context.Context, err error, _ string, _ ...any) { + b.Error("unexpected unhandled error:", err) + } + return tracker + } + + for name, benchmark := range benchmarks { + b.Run(name, func(b *testing.B) { + logger, ctx := ktesting.NewTestContext(b) + ctx = klog.NewContext(ctx, logger.V(2)) + tracker := newBenchTracker(ctx) + + for _, slice := range benchmark.resourceSlices { + err := tracker.resourceSlices.GetIndexer().Add(slice) + require.NoError(b, err) + } + + for _, taintRule := range benchmark.taintRules { + err := tracker.deviceTaints.GetIndexer().Add(taintRule) + require.NoError(b, err) + } + + b.ResetTimer() + for i := range b.N { + benchmark.loop(ctx, b, tracker, benchmark.resourceSlices, benchmark.taintRules, i) + } + }) + } +}