DRA: add ResourceSlice tracker

The purpose of the tracker is to emulate a ResourceSlice informer, including
cache and event handlers. In contrast to that informer, the tracker adds taints
from a DeviceTaint such that they appear in the ResourceSlice device
definition. Code using the tracker doesn't need to care where the taints are
coming from.

The main advantage is that it enables fine-grained reactions to taints that
only affect a few devices, the common case. Without this tracker, the pod
eviction controller would have to sync all pods when any slice or any taint
change.

In the scheduler it avoids re-evaluating the selection criteria repeatedly.
The tracker serves as a cross-pod-scheduling cache.
This commit is contained in:
Jon Huhn 2025-01-27 14:34:34 -06:00 committed by Patrick Ohly
parent 99dbd85c45
commit 939c9c0c6b
4 changed files with 2791 additions and 0 deletions

View File

@ -0,0 +1,112 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// TODO: This is duplicated from ./pkg/scheduler/util/queue because that
// package is not allowed to be imported here.
package queue
const (
// normalSize limits the size of the buffer that is kept
// for reuse.
normalSize = 4
)
// FIFO implements a first-in-first-out queue with unbounded size.
// The null FIFO is a valid empty queue.
//
// Access must be protected by the caller when used concurrently by
// different goroutines, the queue itself implements no locking.
type FIFO[T any] struct {
// elements contains a buffer for elements which have been
// pushed and not popped yet. Two scenarios are possible:
// - one chunk in the middle (start <= end)
// - one chunk at the end, followed by one chunk at the
// beginning (end <= start)
//
// start == end can be either an empty queue or a completely
// full one (with two chunks).
elements []T
// len counts the number of elements which have been pushed and
// not popped yet.
len int
// start is the index of the first valid element.
start int
// end is the index after the last valid element.
end int
}
func (q *FIFO[T]) Len() int {
return q.len
}
func (q *FIFO[T]) Push(element T) {
size := len(q.elements)
if q.len == size {
// Need larger buffer.
newSize := size * 2
if newSize == 0 {
newSize = normalSize
}
elements := make([]T, newSize)
if q.start == 0 {
copy(elements, q.elements)
} else {
copy(elements, q.elements[q.start:])
copy(elements[len(q.elements)-q.start:], q.elements[0:q.end])
}
q.start = 0
q.end = q.len
q.elements = elements
size = newSize
}
if q.end == size {
// Wrap around.
q.elements[0] = element
q.end = 1
q.len++
return
}
q.elements[q.end] = element
q.end++
q.len++
}
func (q *FIFO[T]) Pop() (element T, ok bool) {
if q.len == 0 {
return
}
element = q.elements[q.start]
q.start++
if q.start == len(q.elements) {
// Wrap around.
q.start = 0
}
q.len--
// Once it is empty, shrink down to avoid hanging onto
// a large buffer forever.
if q.len == 0 && len(q.elements) > normalSize {
q.elements = make([]T, normalSize)
q.start = 0
q.end = 0
}
ok = true
return
}

View File

@ -0,0 +1,117 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
func verifyPop(t *testing.T, expectedValue int, expectedOk bool, queue *FIFO[int]) {
t.Helper()
actual, ok := queue.Pop()
require.Equal(t, expectedOk, ok)
require.Equal(t, expectedValue, actual)
}
func verifyEmpty(t *testing.T, queue *FIFO[int]) {
t.Helper()
require.Equal(t, 0, queue.Len())
verifyPop(t, 0, false, queue)
}
func TestNull(t *testing.T) {
var queue FIFO[int]
verifyEmpty(t, &queue)
}
func TestOnePushPop(t *testing.T) {
var queue FIFO[int]
expected := 10
queue.Push(10)
require.Equal(t, 1, queue.Len())
verifyPop(t, expected, true, &queue)
verifyEmpty(t, &queue)
}
// Pushes some elements, pops all of them, then the same again.
func TestWrapAroundEmpty(t *testing.T) {
var queue FIFO[int]
for i := 0; i < 5; i++ {
queue.Push(i)
}
require.Equal(t, 5, queue.Len())
for i := 0; i < 5; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
for i := 5; i < 10; i++ {
queue.Push(i)
}
for i := 5; i < 10; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
}
// Pushes some elements, pops one, adds more, then pops all.
func TestWrapAroundPartial(t *testing.T) {
var queue FIFO[int]
for i := 0; i < 5; i++ {
queue.Push(i)
}
require.Equal(t, 5, queue.Len())
verifyPop(t, 0, true, &queue)
for i := 5; i < 10; i++ {
queue.Push(i)
}
for i := 1; i < 10; i++ {
verifyPop(t, i, true, &queue)
}
verifyEmpty(t, &queue)
}
// Push an unusual amount of elements, pop all, and verify that
// the FIFO shrinks back again.
func TestShrink(t *testing.T) {
var queue FIFO[int]
for i := 0; i < normalSize*2; i++ {
queue.Push(i)
}
require.Equal(t, normalSize*2, queue.Len())
require.LessOrEqual(t, 2*normalSize, len(queue.elements))
// Pop all, should be shrunken when done.
for i := 0; i < normalSize*2; i++ {
verifyPop(t, i, true, &queue)
}
require.Equal(t, 0, queue.Len())
require.Len(t, queue.elements, normalSize)
// Still usable after shrinking?
queue.Push(42)
verifyPop(t, 42, true, &queue)
require.Equal(t, 0, queue.Len())
require.Len(t, queue.elements, normalSize)
}

View File

@ -0,0 +1,798 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracker
import (
"context"
"errors"
"fmt"
"slices"
"sync"
"github.com/google/go-cmp/cmp" //nolint:depguard
v1 "k8s.io/api/core/v1"
resourcealphaapi "k8s.io/api/resource/v1alpha3"
resourceapi "k8s.io/api/resource/v1beta1"
labels "k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
resourcealphainformers "k8s.io/client-go/informers/resource/v1alpha3"
resourceinformers "k8s.io/client-go/informers/resource/v1beta1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
resourcelisters "k8s.io/client-go/listers/resource/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/internal/queue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
const (
driverPoolDeviceIndexName = "driverPoolDevice"
anyDriver = "*"
anyPool = "*"
anyDevice = "*"
)
// Tracker maintains a view of ResourceSlice objects with matching
// DeviceTaintRules applied. It is backed by informers to process
// potential changes to resolved ResourceSlices asynchronously.
type Tracker struct {
enableDeviceTaints bool
resourceSliceLister resourcelisters.ResourceSliceLister
resourceSlices cache.SharedIndexInformer
resourceSlicesHandle cache.ResourceEventHandlerRegistration
deviceTaints cache.SharedIndexInformer
deviceTaintsHandle cache.ResourceEventHandlerRegistration
deviceClasses cache.SharedIndexInformer
deviceClassesHandle cache.ResourceEventHandlerRegistration
celCache *cel.Cache
patchedResourceSlices cache.Store
broadcaster record.EventBroadcaster
recorder record.EventRecorder
// handleError usually refers to [utilruntime.HandleErrorWithContext] but
// may be overridden in tests.
handleError func(context.Context, error, string, ...any)
// Synchronizes updates to these fields related to event handlers.
rwMutex sync.RWMutex
// All registered event handlers.
eventHandlers []cache.ResourceEventHandler
// The eventQueue contains functions which deliver an event to one
// event handler.
//
// These functions must be invoked while *not locking* rwMutex because
// the event handlers are allowed to access the cache. Holding rwMutex
// then would cause a deadlock.
//
// New functions get added as part of processing a cache update while
// the rwMutex is locked. Each function which adds something to the queue
// also drains the queue before returning, therefore it is guaranteed
// that all event handlers get notified immediately (useful for unit
// testing).
//
// A channel cannot be used here because it cannot have an unbounded
// capacity. This could lead to a deadlock (writer holds rwMutex,
// gets blocked because capacity is exhausted, reader is in a handler
// which tries to lock the rwMutex). Writing into such a channel
// while not holding the rwMutex doesn't work because in-order delivery
// of events would no longer be guaranteed.
eventQueue queue.FIFO[func()]
}
// Options configure a [Tracker].
type Options struct {
// EnableDeviceTaints controls whether DeviceTaintRules
// will be reflected in ResourceSlices reported by the tracker.
//
// If false, then TaintInformer and ClassInformer
// are not needed. The tracker turns into
// a thin wrapper around the underlying
// SliceInformer, with no processing of its own.
EnableDeviceTaints bool
SliceInformer resourceinformers.ResourceSliceInformer
TaintInformer resourcealphainformers.DeviceTaintRuleInformer
ClassInformer resourceinformers.DeviceClassInformer
// KubeClient is used to generate Events when CEL expressions
// encounter runtime errors.
KubeClient kubernetes.Interface
}
// StartTracker creates and initializes informers for a new [Tracker].
func StartTracker(ctx context.Context, opts Options) (finalT *Tracker, finalErr error) {
if !opts.EnableDeviceTaints {
// Minimal wrapper. All public methods shortcut by calling the underlying informer.
return &Tracker{
resourceSliceLister: opts.SliceInformer.Lister(),
resourceSlices: opts.SliceInformer.Informer(),
}, nil
}
t, err := newTracker(ctx, opts)
if err != nil {
return nil, err
}
defer func() {
// If we don't return the tracker, stop the partially initialized instance.
if finalErr != nil {
t.Stop()
}
}()
if err := t.initInformers(ctx); err != nil {
return nil, fmt.Errorf("initialize informers: %w", err)
}
return t, nil
}
// newTracker is used in testing to construct a tracker without informer event handlers.
func newTracker(ctx context.Context, opts Options) (finalT *Tracker, finalErr error) {
t := &Tracker{
enableDeviceTaints: opts.EnableDeviceTaints,
resourceSliceLister: opts.SliceInformer.Lister(),
resourceSlices: opts.SliceInformer.Informer(),
deviceTaints: opts.TaintInformer.Informer(),
deviceClasses: opts.ClassInformer.Informer(),
celCache: cel.NewCache(10),
patchedResourceSlices: cache.NewStore(cache.MetaNamespaceKeyFunc),
handleError: utilruntime.HandleErrorWithContext,
}
defer func() {
// If we don't return the tracker, stop the partially initialized instance.
if finalErr != nil {
t.Stop()
}
}()
err := t.resourceSlices.AddIndexers(cache.Indexers{driverPoolDeviceIndexName: sliceDriverPoolDeviceIndexFunc})
if err != nil {
return nil, fmt.Errorf("failed to add %s index to ResourceSlice informer: %w", driverPoolDeviceIndexName, err)
}
// KubeClient is not always set in unit tests.
if opts.KubeClient != nil {
t.broadcaster = record.NewBroadcaster(record.WithContext(ctx))
t.broadcaster.StartLogging(klog.Infof)
t.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: opts.KubeClient.CoreV1().Events("")})
t.recorder = t.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_slice_tracker"})
}
return t, nil
}
// initInformers adds event handlers to a tracker constructed with newTracker.
func (t *Tracker) initInformers(ctx context.Context) error {
var err error
sliceHandler := cache.ResourceEventHandlerFuncs{
AddFunc: t.resourceSliceAdd(ctx),
UpdateFunc: t.resourceSliceUpdate(ctx),
DeleteFunc: t.resourceSliceDelete(ctx),
}
t.resourceSlicesHandle, err = t.resourceSlices.AddEventHandler(sliceHandler)
if err != nil {
return fmt.Errorf("add event handler for ResourceSlices: %w", err)
}
taintHandler := cache.ResourceEventHandlerFuncs{
AddFunc: t.deviceTaintAdd(ctx),
UpdateFunc: t.deviceTaintUpdate(ctx),
DeleteFunc: t.deviceTaintDelete(ctx),
}
t.deviceTaintsHandle, err = t.deviceTaints.AddEventHandler(taintHandler)
if err != nil {
return fmt.Errorf("add event handler for DeviceTaintRules: %w", err)
}
classHandler := cache.ResourceEventHandlerFuncs{
AddFunc: t.deviceClassAdd(ctx),
UpdateFunc: t.deviceClassUpdate(ctx),
DeleteFunc: t.deviceClassDelete(ctx),
}
t.deviceClassesHandle, err = t.deviceClasses.AddEventHandler(classHandler)
if err != nil {
return fmt.Errorf("add event handler for DeviceClasses: %w", err)
}
return nil
}
// HasSynced returns true if the tracker is done with processing all
// currently existing input objects. Adding a new event handler at that
// point is possible and will emit events with up-to-date ResourceSlice
// objects.
func (t *Tracker) HasSynced() bool {
if !t.enableDeviceTaints {
return t.resourceSlices.HasSynced()
}
if t.resourceSlicesHandle != nil && !t.resourceSlicesHandle.HasSynced() {
return false
}
if t.deviceTaintsHandle != nil && !t.deviceTaintsHandle.HasSynced() {
return false
}
if t.deviceClassesHandle != nil && !t.deviceClassesHandle.HasSynced() {
return false
}
return true
}
// Stop ends all background activity and blocks until that shutdown is complete.
func (t *Tracker) Stop() {
if !t.enableDeviceTaints {
return
}
if t.broadcaster != nil {
t.broadcaster.Shutdown()
}
_ = t.resourceSlices.RemoveEventHandler(t.resourceSlicesHandle)
_ = t.deviceTaints.RemoveEventHandler(t.deviceTaintsHandle)
_ = t.deviceClasses.RemoveEventHandler(t.deviceClassesHandle)
}
// ListPatchedResourceSlices returns all ResourceSlices in the cluster with
// modifications from DeviceTaints applied.
func (t *Tracker) ListPatchedResourceSlices() ([]*resourceapi.ResourceSlice, error) {
if !t.enableDeviceTaints {
return t.resourceSliceLister.List(labels.Everything())
}
return typedSlice[*resourceapi.ResourceSlice](t.patchedResourceSlices.List()), nil
}
// AddEventHandler adds an event handler to the tracker. Events to a
// single handler are delivered sequentially, but there is no
// coordination between different handlers. A handler may use the
// tracker.
//
// The return value can be used to wait for cache synchronization.
// All currently know ResourceSlices get delivered via Add events
// before this method returns.
func (t *Tracker) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
if !t.enableDeviceTaints {
return t.resourceSlices.AddEventHandler(handler)
}
defer t.emitEvents()
t.rwMutex.Lock()
defer t.rwMutex.Unlock()
t.eventHandlers = append(t.eventHandlers, handler)
allObjs, _ := t.ListPatchedResourceSlices()
for _, obj := range allObjs {
t.eventQueue.Push(func() {
handler.OnAdd(obj, true)
})
}
// The tracker itself provides HasSynced for all registered event handlers.
// We don't support removal, so returning the same handle here for all
// of them is fine.
return t, nil
}
// emitEvents delivers all pending events that are in the queue, in the order
// in which they were stored there (FIFO).
func (t *Tracker) emitEvents() {
for {
t.rwMutex.Lock()
deliver, ok := t.eventQueue.Pop()
t.rwMutex.Unlock()
if !ok {
return
}
func() {
defer utilruntime.HandleCrash()
deliver()
}()
}
}
// pushEvent ensures that all currently registered event handlers get
// notified about a change when the caller starts delivering
// those with emitEvents.
//
// For a delete event, newObj is nil. For an add, oldObj is nil.
// An update has both as non-nil.
func (t *Tracker) pushEvent(oldObj, newObj any) {
t.rwMutex.Lock()
defer t.rwMutex.Unlock()
for _, handler := range t.eventHandlers {
handler := handler
if oldObj == nil {
t.eventQueue.Push(func() {
handler.OnAdd(newObj, false)
})
} else if newObj == nil {
t.eventQueue.Push(func() {
handler.OnDelete(oldObj)
})
} else {
t.eventQueue.Push(func() {
handler.OnUpdate(oldObj, newObj)
})
}
}
}
func sliceDriverPoolDeviceIndexFunc(obj any) ([]string, error) {
slice := obj.(*resourceapi.ResourceSlice)
drivers := []string{
anyDriver,
slice.Spec.Driver,
}
pools := []string{
anyPool,
slice.Spec.Pool.Name,
}
indexValues := make([]string, 0, len(drivers)*len(pools)*(1+len(slice.Spec.Devices)))
for _, driver := range drivers {
for _, pool := range pools {
indexValues = append(indexValues, deviceID(driver, pool, anyDevice))
for _, device := range slice.Spec.Devices {
indexValues = append(indexValues, deviceID(driver, pool, device.Name))
}
}
}
return indexValues, nil
}
func driverPoolDeviceIndexPatchKey(patch *resourcealphaapi.DeviceTaintRule) string {
deviceSelector := ptr.Deref(patch.Spec.DeviceSelector, resourcealphaapi.DeviceTaintSelector{})
driverKey := ptr.Deref(deviceSelector.Driver, anyDriver)
poolKey := ptr.Deref(deviceSelector.Pool, anyPool)
deviceKey := ptr.Deref(deviceSelector.Device, anyDevice)
return deviceID(driverKey, poolKey, deviceKey)
}
func (t *Tracker) sliceNamesForPatch(ctx context.Context, patch *resourcealphaapi.DeviceTaintRule) []string {
patchKey := driverPoolDeviceIndexPatchKey(patch)
sliceNames, err := t.resourceSlices.GetIndexer().IndexKeys(driverPoolDeviceIndexName, patchKey)
if err != nil {
t.handleError(ctx, err, "failed listing ResourceSlices for driver/pool/device key", "key", patchKey)
return nil
}
return sliceNames
}
func (t *Tracker) resourceSliceAdd(ctx context.Context) func(obj any) {
logger := klog.FromContext(ctx)
return func(obj any) {
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return
}
logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice))
t.syncSlice(ctx, slice.Name, true)
}
}
func (t *Tracker) resourceSliceUpdate(ctx context.Context) func(oldObj, newObj any) {
logger := klog.FromContext(ctx)
return func(oldObj, newObj any) {
oldSlice, ok := oldObj.(*resourceapi.ResourceSlice)
if !ok {
return
}
newSlice, ok := newObj.(*resourceapi.ResourceSlice)
if !ok {
return
}
if loggerV := logger.V(6); loggerV.Enabled() {
// While debugging, one needs a full dump of the objects for context *and*
// a diff because otherwise small changes would be hard to spot.
loggerV.Info("ResourceSlice update", "slice", klog.Format(oldSlice), "oldSlice", klog.Format(newSlice), "diff", cmp.Diff(oldSlice, newSlice))
} else {
logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice))
}
t.syncSlice(ctx, newSlice.Name, true)
}
}
func (t *Tracker) resourceSliceDelete(ctx context.Context) func(obj any) {
logger := klog.FromContext(ctx)
return func(obj any) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return
}
logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice))
t.syncSlice(ctx, slice.Name, true)
}
}
func (t *Tracker) deviceTaintAdd(ctx context.Context) func(obj any) {
logger := klog.FromContext(ctx)
return func(obj any) {
patch, ok := obj.(*resourcealphaapi.DeviceTaintRule)
if !ok {
return
}
logger.V(5).Info("DeviceTaintRule add", "patch", klog.KObj(patch))
for _, sliceName := range t.sliceNamesForPatch(ctx, patch) {
t.syncSlice(ctx, sliceName, false)
}
}
}
func (t *Tracker) deviceTaintUpdate(ctx context.Context) func(oldObj, newObj any) {
logger := klog.FromContext(ctx)
return func(oldObj, newObj any) {
oldPatch, ok := oldObj.(*resourcealphaapi.DeviceTaintRule)
if !ok {
return
}
newPatch, ok := newObj.(*resourcealphaapi.DeviceTaintRule)
if !ok {
return
}
if loggerV := logger.V(6); loggerV.Enabled() {
loggerV.Info("DeviceTaintRule update", "patch", klog.KObj(newPatch), "diff", cmp.Diff(oldPatch, newPatch))
} else {
logger.V(5).Info("DeviceTaintRule update", "patch", klog.KObj(newPatch))
}
// Slices that matched the old patch may need to be updated, in
// case they no longer match the new patch and need to have the
// patch's changes reverted.
slicesToSync := sets.New[string]()
slicesToSync.Insert(t.sliceNamesForPatch(ctx, oldPatch)...)
slicesToSync.Insert(t.sliceNamesForPatch(ctx, newPatch)...)
for _, sliceName := range slicesToSync.UnsortedList() {
t.syncSlice(ctx, sliceName, false)
}
}
}
func (t *Tracker) deviceTaintDelete(ctx context.Context) func(obj any) {
logger := klog.FromContext(ctx)
return func(obj any) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
patch, ok := obj.(*resourcealphaapi.DeviceTaintRule)
if !ok {
return
}
logger.V(5).Info("DeviceTaintRule delete", "patch", klog.KObj(patch))
for _, sliceName := range t.sliceNamesForPatch(ctx, patch) {
t.syncSlice(ctx, sliceName, false)
}
}
}
func (t *Tracker) deviceClassAdd(ctx context.Context) func(obj any) {
logger := klog.FromContext(ctx)
return func(obj any) {
class, ok := obj.(*resourceapi.DeviceClass)
if !ok {
return
}
logger.V(5).Info("DeviceClass add", "class", klog.KObj(class))
for _, sliceName := range t.resourceSlices.GetIndexer().ListKeys() {
t.syncSlice(ctx, sliceName, false)
}
}
}
func (t *Tracker) deviceClassUpdate(ctx context.Context) func(oldObj, newObj any) {
logger := klog.FromContext(ctx)
return func(oldObj, newObj any) {
oldClass, ok := oldObj.(*resourceapi.DeviceClass)
if !ok {
return
}
newClass, ok := newObj.(*resourceapi.DeviceClass)
if !ok {
return
}
if loggerV := logger.V(6); loggerV.Enabled() {
loggerV.Info("DeviceClass update", "class", klog.KObj(newClass), "diff", cmp.Diff(oldClass, newClass))
} else {
logger.V(5).Info("DeviceClass update", "class", klog.KObj(newClass))
}
for _, sliceName := range t.resourceSlices.GetIndexer().ListKeys() {
t.syncSlice(ctx, sliceName, false)
}
}
}
func (t *Tracker) deviceClassDelete(ctx context.Context) func(obj any) {
logger := klog.FromContext(ctx)
return func(obj any) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
class, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return
}
logger.V(5).Info("DeviceClass delete", "class", klog.KObj(class))
for _, sliceName := range t.resourceSlices.GetIndexer().ListKeys() {
t.syncSlice(ctx, sliceName, false)
}
}
}
// syncSlice updates the slice with the given name, applying
// DeviceTaints that match. sendEvent is used to force the Tracker
// to publish an event for listeners added by [Tracker.AddEventHandler]. It
// is set when syncSlice is triggered by a ResourceSlice event to avoid
// doing costly DeepEqual comparisons where possible.
func (t *Tracker) syncSlice(ctx context.Context, name string, sendEvent bool) {
defer t.emitEvents()
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "resourceslice", name)
ctx = klog.NewContext(ctx, logger)
logger.V(5).Info("syncing ResourceSlice")
obj, sliceExists, err := t.resourceSlices.GetIndexer().GetByKey(name)
if err != nil {
t.handleError(ctx, err, "failed to lookup existing resource slice", "resourceslice", name)
return
}
oldPatchedObj, oldSliceExists, err := t.patchedResourceSlices.GetByKey(name)
if err != nil {
t.handleError(ctx, err, "failed to lookup cached patched resource slice", "resourceslice", name)
return
}
if !sliceExists {
err := t.patchedResourceSlices.Delete(oldPatchedObj)
if err != nil {
t.handleError(ctx, err, "failed to delete cached patched resource slice", "resourceslice", name)
return
}
t.pushEvent(oldPatchedObj, nil)
logger.V(5).Info("patched ResourceSlice deleted")
return
}
var oldPatchedSlice *resourceapi.ResourceSlice
if oldSliceExists {
var ok bool
oldPatchedSlice, ok = oldPatchedObj.(*resourceapi.ResourceSlice)
if !ok {
t.handleError(ctx, errors.New("invalid type in resource slice cache"), "expectedType", fmt.Sprintf("%T", (*resourceapi.ResourceSlice)(nil)), "gotType", fmt.Sprintf("%T", oldPatchedObj))
return
}
}
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
t.handleError(ctx, errors.New("invalid type in resource slice cache"), fmt.Sprintf("expected type to be %T, got %T", (*resourceapi.ResourceSlice)(nil), obj))
return
}
patches := typedSlice[*resourcealphaapi.DeviceTaintRule](t.deviceTaints.GetIndexer().List())
patchedSlice, err := t.applyPatches(ctx, slice, patches)
if err != nil {
t.handleError(ctx, err, "failed to apply patches to ResourceSlice", "resourceslice", klog.KObj(slice))
return
}
// When syncSlice is triggered by something other than a ResourceSlice
// event, only the device attributes and capacity might change. We
// deliberately avoid any costly DeepEqual-style comparisons here.
if !sendEvent && oldPatchedSlice != nil {
for i := range patchedSlice.Spec.Devices {
oldDevice := oldPatchedSlice.Spec.Devices[i]
newDevice := patchedSlice.Spec.Devices[i]
sendEvent = sendEvent ||
!slices.EqualFunc(getTaints(oldDevice), getTaints(newDevice), taintsEqual)
}
}
err = t.patchedResourceSlices.Add(patchedSlice)
if err != nil {
t.handleError(ctx, err, "failed to add patched resource slice to cache", "resourceslice", klog.KObj(patchedSlice))
return
}
if sendEvent {
t.pushEvent(oldPatchedObj, patchedSlice)
}
if loggerV := logger.V(6); loggerV.Enabled() {
loggerV.Info("ResourceSlice synced", "diff", cmp.Diff(oldPatchedObj, patchedSlice))
} else {
logger.V(5).Info("ResourceSlice synced")
}
}
func (t *Tracker) applyPatches(ctx context.Context, slice *resourceapi.ResourceSlice, taintRules []*resourcealphaapi.DeviceTaintRule) (*resourceapi.ResourceSlice, error) {
logger := klog.FromContext(ctx)
// slice will be DeepCopied just-in-time, only when necessary.
patchedSlice := slice
for _, taintRule := range taintRules {
logger := klog.LoggerWithValues(logger, "deviceTaintRule", klog.KObj(taintRule))
logger.V(6).Info("processing DeviceTaintRule")
deviceSelector := taintRule.Spec.DeviceSelector
var deviceClassExprs []cel.CompilationResult
var selectorExprs []cel.CompilationResult
var deviceName *string
if deviceSelector != nil {
if deviceSelector.Driver != nil && *deviceSelector.Driver != slice.Spec.Driver {
logger.V(7).Info("DeviceTaintRule does not apply, mismatched driver", "sliceDriver", slice.Spec.Driver, "taintDriver", *deviceSelector.Driver)
continue
}
if deviceSelector.Pool != nil && *deviceSelector.Pool != slice.Spec.Pool.Name {
logger.V(7).Info("DeviceTaintRule does not apply, mismatched pool", "slicePool", slice.Spec.Pool.Name, "taintPool", *deviceSelector.Pool)
continue
}
deviceName = deviceSelector.Device
if deviceSelector.DeviceClassName != nil {
logger := logger.WithValues("deviceClassName", *deviceSelector.DeviceClassName)
classObj, exists, err := t.deviceClasses.GetIndexer().GetByKey(*deviceSelector.DeviceClassName)
if err != nil {
return nil, fmt.Errorf("failed to get device class %s for DeviceTaintRule %s", *deviceSelector.DeviceClassName, taintRule.Name)
}
if !exists {
logger.V(7).Info("DeviceTaintRule does not apply, DeviceClass does not exist")
continue
}
class := classObj.(*resourceapi.DeviceClass)
for _, selector := range class.Spec.Selectors {
if selector.CEL != nil {
expr := t.celCache.GetOrCompile(selector.CEL.Expression)
deviceClassExprs = append(deviceClassExprs, expr)
}
}
}
for _, selector := range deviceSelector.Selectors {
if selector.CEL != nil {
expr := t.celCache.GetOrCompile(selector.CEL.Expression)
selectorExprs = append(selectorExprs, expr)
}
}
}
devices:
for dIndex, device := range slice.Spec.Devices {
deviceID := deviceID(slice.Spec.Driver, slice.Spec.Pool.Name, device.Name)
logger := logger.WithValues("device", deviceID)
if deviceName != nil && *deviceName != device.Name {
logger.V(7).Info("DeviceTaintRule does not apply, mismatched device", "sliceDevice", device.Name, "taintDevice", *deviceSelector.Device)
continue
}
deviceAttributes := getAttributes(device)
deviceCapacity := getCapacity(device)
for i, expr := range deviceClassExprs {
if expr.Error != nil {
// Could happen if some future apiserver accepted some
// future expression and then got downgraded. Normally
// the "stored expression" mechanism prevents that, but
// this code here might be more than one release older
// than the cluster it runs in.
return nil, fmt.Errorf("DeviceTaintRule %s: class %s: selector #%d: CEL compile error: %w", taintRule.Name, *deviceSelector.DeviceClassName, i, expr.Error)
}
matches, details, err := expr.DeviceMatches(ctx, cel.Device{Driver: slice.Spec.Driver, Attributes: deviceAttributes, Capacity: deviceCapacity})
logger.V(7).Info("CEL result", "class", *deviceSelector.DeviceClassName, "selector", i, "expression", expr.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err)
if err != nil {
continue devices
}
if !matches {
continue devices
}
}
for i, expr := range selectorExprs {
if expr.Error != nil {
// Could happen if some future apiserver accepted some
// future expression and then got downgraded. Normally
// the "stored expression" mechanism prevents that, but
// this code here might be more than one release older
// than the cluster it runs in.
return nil, fmt.Errorf("DeviceTaintRule %s: selector #%d: CEL compile error: %w", taintRule.Name, i, expr.Error)
}
matches, details, err := expr.DeviceMatches(ctx, cel.Device{Driver: slice.Spec.Driver, Attributes: deviceAttributes, Capacity: deviceCapacity})
logger.V(7).Info("CEL result", "selector", i, "expression", expr.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err)
if err != nil {
if t.recorder != nil {
t.recorder.Eventf(taintRule, v1.EventTypeWarning, "CELRuntimeError", "selector #%d: runtime error: %v", i, err)
}
continue devices
}
if !matches {
continue devices
}
}
logger.V(6).Info("applying matching DeviceTaintRule")
// TODO: remove conversion once taint is already in the right API package.
ta := resourceapi.DeviceTaint{
Key: taintRule.Spec.Taint.Key,
Value: taintRule.Spec.Taint.Value,
Effect: resourceapi.DeviceTaintEffect(taintRule.Spec.Taint.Effect),
TimeAdded: taintRule.Spec.Taint.TimeAdded,
}
if patchedSlice == slice {
patchedSlice = slice.DeepCopy()
}
appendTaint(&patchedSlice.Spec.Devices[dIndex], ta)
}
}
return patchedSlice, nil
}
func getAttributes(device resourceapi.Device) map[resourceapi.QualifiedName]resourceapi.DeviceAttribute {
if device.Basic != nil {
return device.Basic.Attributes
}
return nil
}
func getCapacity(device resourceapi.Device) map[resourceapi.QualifiedName]resourceapi.DeviceCapacity {
if device.Basic != nil {
return device.Basic.Capacity
}
return nil
}
func getTaints(device resourceapi.Device) []resourceapi.DeviceTaint {
if device.Basic != nil {
return device.Basic.Taints
}
return nil
}
func appendTaint(device *resourceapi.Device, taint resourceapi.DeviceTaint) {
if device.Basic != nil {
device.Basic.Taints = append(device.Basic.Taints, taint)
return
}
}
func taintsEqual(a, b resourceapi.DeviceTaint) bool {
return a.Key == b.Key &&
a.Effect == b.Effect &&
a.Value == b.Value &&
a.TimeAdded.Equal(b.TimeAdded) // Equal deals with nil.
}
func deviceID(driver, pool, device string) string {
return driver + "/" + pool + "/" + device
}
func typedSlice[T any](objs []any) []T {
if objs == nil {
return nil
}
typed := make([]T, 0, len(objs))
for _, obj := range objs {
typed = append(typed, obj.(T))
}
return typed
}