diff --git a/staging/src/k8s.io/dynamic-resource-allocation/internal/workqueue/mockqueue.go b/staging/src/k8s.io/dynamic-resource-allocation/internal/workqueue/mockqueue.go new file mode 100644 index 00000000000..bf6b6f15b69 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/internal/workqueue/mockqueue.go @@ -0,0 +1,247 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "maps" + "slices" + "sync" + "time" + + "k8s.io/client-go/util/workqueue" +) + +// TODO (pohly): move this to k8s.io/client-go/util/workqueue/mockqueue.go +// if it turns out to be generally useful. Doc comments are already written +// as if the code was there. + +// MockQueue is an implementation of [TypedRateLimitingInterface] which +// can be used to test a function which pulls work items out of a queue +// and processes them. It is thread-safe. +// +// A null instance is directly usable. The usual usage is: +// +// var m workqueue.Mock[string] +// m.SyncOne("some-item", func(queue workqueue.TypedRateLimitingInterface[string]) { ... } ) +// if diff := cmp.Diff(workqueue.MockState[string]{}, m.State()); diff != "" { +// t.Errorf("unexpected state of mock work queue after sync (-want, +got):\n%s", diff) +// } +// +// All slices get reset to nil when they become empty, so there are no spurious +// differences because of nil vs. empty slice. +type Mock[T comparable] struct { + mutex sync.Mutex + state MockState[T] +} + +type MockState[T comparable] struct { + // Ready contains the items which are ready for processing. + Ready []T + + // InFlight contains the items which are currently being processed (= Get + // was called, Done not yet). + InFlight []T + + // MismatchedDone contains the items for which Done was called without + // a matching Get. + MismatchedDone []T + + // Later contains the items which are meant to be added to the queue after + // a certain delay (= AddAfter was called for them). They appear in the + // order in which AddAfter got called. + Later []MockDelayedItem[T] + + // Failures contains the items and their retry count which failed to be + // processed (AddRateLimited called at least once, Forget not yet). + // The retry count is always larger than zero. + Failures map[T]int + + // ShutDownCalled tracks how often ShutDown got called. + ShutDownCalled int + + // ShutDownWithDrainCalled tracks how often ShutDownWithDrain got called. + ShutDownWithDrainCalled int +} + +// DeepCopy takes a snapshot of all slices. It cannot do a deep copy of the items in those slices, +// but typically those keys are immutable. +func (m MockState[T]) DeepCopy() *MockState[T] { + m.Ready = slices.Clone(m.Ready) + m.InFlight = slices.Clone(m.InFlight) + m.MismatchedDone = slices.Clone(m.MismatchedDone) + m.Later = slices.Clone(m.Later) + m.Failures = maps.Clone(m.Failures) + return &m +} + +// MockDelayedItem is an item which was queue for later processing. +type MockDelayedItem[T comparable] struct { + Item T + Duration time.Duration +} + +// SyncOne adds the item to the work queue and calls sync. +// That sync function can pull one or more items from the work +// queue until the queue is empty. Then it is told that the queue +// is shutting down, which must cause it to return. +// +// The test can then retrieve the state of the queue to check the result. +func (m *Mock[T]) SyncOne(item T, sync func(workqueue.TypedRateLimitingInterface[T])) { + // sync must run with the mutex not locked. + defer sync(m) + m.mutex.Lock() + defer m.mutex.Unlock() + + m.state.Ready = append(m.state.Ready, item) +} + +// State returns the current state of the queue. +func (m *Mock[T]) State() MockState[T] { + m.mutex.Lock() + defer m.mutex.Unlock() + + return *m.state.DeepCopy() +} + +// Add implements [TypedInterface]. +func (m *Mock[T]) Add(item T) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if !slices.Contains(m.state.Ready, item) { + m.state.Ready = append(m.state.Ready, item) + } +} + +// Len implements [TypedInterface]. +func (m *Mock[T]) Len() int { + m.mutex.Lock() + defer m.mutex.Unlock() + + return len(m.state.Ready) +} + +// Get implements [TypedInterface]. +func (m *Mock[T]) Get() (item T, shutdown bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if len(m.state.Ready) == 0 { + shutdown = true + return + } + item = m.state.Ready[0] + m.state.Ready = m.state.Ready[1:] + if len(m.state.Ready) == 0 { + m.state.Ready = nil + } + m.state.InFlight = append(m.state.InFlight, item) + return item, false +} + +// Done implements [TypedInterface]. +func (m *Mock[T]) Done(item T) { + m.mutex.Lock() + defer m.mutex.Unlock() + + index := slices.Index(m.state.InFlight, item) + if index < 0 { + m.state.MismatchedDone = append(m.state.MismatchedDone, item) + } + m.state.InFlight = slices.Delete(m.state.InFlight, index, index+1) + if len(m.state.InFlight) == 0 { + m.state.InFlight = nil + } +} + +// ShutDown implements [TypedInterface]. +func (m *Mock[T]) ShutDown() { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.state.ShutDownCalled++ +} + +// ShutDownWithDrain implements [TypedInterface]. +func (m *Mock[T]) ShutDownWithDrain() { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.state.ShutDownWithDrainCalled++ +} + +// ShuttingDown implements [TypedInterface]. +func (m *Mock[T]) ShuttingDown() bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.state.ShutDownCalled > 0 || m.state.ShutDownWithDrainCalled > 0 +} + +// AddAfter implements [TypedDelayingInterface.AddAfter] +func (m *Mock[T]) AddAfter(item T, duration time.Duration) { + if duration == 0 { + m.Add(item) + return + } + + m.mutex.Lock() + defer m.mutex.Unlock() + + for i := range m.state.Later { + if m.state.Later[i].Item == item { + // https://github.com/kubernetes/client-go/blob/270e5ab1714527c455865953da8ceba2810dbb50/util/workqueue/delaying_queue.go#L340-L349 + // only shortens the delay for an existing item. It does not make it longer. + if m.state.Later[i].Duration > duration { + m.state.Later[i].Duration = duration + } + return + } + } + + m.state.Later = append(m.state.Later, MockDelayedItem[T]{Item: item, Duration: duration}) +} + +// AddRateLimited implements [TypedRateLimitingInterface.AddRateLimited]. +func (m *Mock[T]) AddRateLimited(item T) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.state.Failures == nil { + m.state.Failures = make(map[T]int) + } + m.state.Failures[item]++ +} + +// Forget implements [TypedRateLimitingInterface.Forget]. +func (m *Mock[T]) Forget(item T) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.state.Failures == nil { + return + } + delete(m.state.Failures, item) +} + +// NumRequeues implements [TypedRateLimitingInterface.NumRequeues]. +func (m *Mock[T]) NumRequeues(item T) int { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.state.Failures[item] +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 37dc11ea677..3adbf6b2318 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -393,7 +393,9 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e driverResources := &resourceslice.DriverResources{ Pools: map[string]resourceslice.Pool{ d.nodeName: { - Devices: resources.Devices, + Slices: []resourceslice.Slice{{ + Devices: resources.Devices, + }}, }, }, } @@ -407,7 +409,13 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") controllerCtx = klog.NewContext(controllerCtx, controllerLogger) var err error - if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil { + if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, + resourceslice.Options{ + DriverName: d.driverName, + KubeClient: d.kubeClient, + Owner: &owner, + Resources: driverResources, + }); err != nil { return fmt.Errorf("start ResourceSlice controller: %w", err) } return nil diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 0f869181197..7a5341b6219 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -20,19 +20,21 @@ import ( "context" "errors" "fmt" - "sort" "sync" + "sync/atomic" "time" "github.com/google/go-cmp/cmp" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" resourceinformers "k8s.io/client-go/informers/resource/v1alpha3" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -49,6 +51,20 @@ const ( // poolNameIndex is the name for the ResourceSlice store's index function, // which is to index by ResourceSlice.Spec.Pool.Name poolNameIndex = "poolName" + + // Including adds in the mutation cache is not safe: We could add a slice, store it, + // and then the slice gets deleted without the informer hearing anything about that. + // Then the obsolete slice remains in the mutation cache. + // + // To mitigate this, we use a TTL and check a pool again once added slices expire. + defaultMutationCacheTTL = time.Minute + + // defaultSyncDelay defines how long to wait between receiving the most recent + // informer event and syncing again. This is long enough that the informer cache + // should be up-to-date (matter mostly for deletes because an out-dated cache + // causes redundant delete API calls) and not too long that a human mistake + // doesn't get fixed while that human is waiting for it. + defaultSyncDelay = 30 * time.Second ) // Controller synchronizes information about resources of one driver with @@ -57,13 +73,20 @@ const ( // controller as part of its kubelet plugin. type Controller struct { cancel func(cause error) - driver string - owner Owner + driverName string + owner *Owner kubeClient kubernetes.Interface wg sync.WaitGroup // The queue is keyed with the pool name that needs work. - queue workqueue.TypedRateLimitingInterface[string] - sliceStore cache.Indexer + queue workqueue.TypedRateLimitingInterface[string] + sliceStore cache.MutationCache + mutationCacheTTL time.Duration + syncDelay time.Duration + + // Must use atomic access... + numCreates int64 + numUpdates int64 + numDeletes int64 mutex sync.RWMutex @@ -94,7 +117,28 @@ type Pool struct { // by the controller. Generation int64 - // Device names must be unique inside the pool. + // Slices is a list of all ResourceSlices that the driver + // wants to publish for this pool. The driver must ensure + // that each resulting slice is valid. See the API + // definition for details, in particular the limit on + // the number of devices. + // + // If slices are not valid, then the controller will + // log errors produced by the apiserver. + // + // Drivers should publish at least one slice for each + // pool that they normally manage, even if that slice + // is empty. "Empty pool" is different from "no pool" + // because it shows that the driver is up-and-running + // and simply doesn't have any devices. + Slices []Slice +} + +// +k8s:deepcopy-gen=true + +// Slice is turned into one ResourceSlice by the controller. +type Slice struct { + // Devices lists all devices which are part of the slice. Devices []resourceapi.Device } @@ -110,19 +154,9 @@ type Owner struct { } // StartController constructs a new controller and starts it. -// If the owner is a v1.Node, then the NodeName field in the -// ResourceSlice objects is set and used to identify objects -// managed by the controller. The UID is not needed in that -// case, the controller will determine it automatically. -// -// If a kubeClient is provided, then it synchronizes ResourceSlices -// with the resource information provided by plugins. Without it, -// the controller is inactive. This can happen when kubelet is run stand-alone -// without an apiserver. In that case we can't and don't need to publish -// ResourceSlices. -func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) { +func StartController(ctx context.Context, options Options) (*Controller, error) { logger := klog.FromContext(ctx) - c, err := newController(ctx, kubeClient, driver, owner, resources) + c, err := newController(ctx, options) if err != nil { return nil, fmt.Errorf("create controller: %w", err) } @@ -134,15 +168,49 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive defer logger.V(3).Info("Stopping") c.run(ctx) }() - - // Sync each pool once. - for poolName := range resources.Pools { - c.queue.Add(poolName) - } - return c, nil } +// Options contains various optional settings for [StartController]. +type Options struct { + // DriverName is the required name of the DRA driver. + DriverName string + + // KubeClient is used to read Node objects (if necessary) and to access + // ResourceSlices. It must be specified. + KubeClient kubernetes.Interface + + // If the owner is a v1.Node, then the NodeName field in the + // ResourceSlice objects is set and used to identify objects + // managed by the controller. The UID is not needed in that + // case, the controller will determine it automatically. + // + // The owner must be cluster-scoped. This is not always possible, + // therefore it is optional. A driver without a owner must take + // care that remaining slices get deleted manually as part of + // a driver uninstall because garbage collection won't work. + Owner *Owner + + // This is the initial desired set of slices. + Resources *DriverResources + + // Queue can be used to override the default work queue implementation. + Queue workqueue.TypedRateLimitingInterface[string] + + // MutationCacheTTL can be used to change the default TTL of one minute. + // See source code for details. + MutationCacheTTL *time.Duration + + // SyncDelay defines how long to wait between receiving the most recent + // informer event and syncing again. The default is 30 seconds. + // + // This is long enough that the informer cache should be up-to-date + // (matter mostly for deletes because an out-dated cache causes + // redundant delete API calls) and not too long that a human mistake + // doesn't get fixed while that human is waiting for it. + SyncDelay *time.Duration +} + // Stop cancels all background activity and blocks until the controller has stopped. func (c *Controller) Stop() { if c == nil { @@ -154,8 +222,8 @@ func (c *Controller) Stop() { // Update sets the new desired state of the resource information. // -// The controller takes over ownership, so these resources must -// not get modified after this method returns. +// The controller is doing a deep copy, so the caller may update +// the instance once Update returns. func (c *Controller) Update(resources *DriverResources) { c.mutex.Lock() defer c.mutex.Unlock() @@ -165,7 +233,7 @@ func (c *Controller) Update(resources *DriverResources) { c.queue.Add(poolName) } - c.resources = resources + c.resources = resources.DeepCopy() // ... and the new ones (might be the same). for poolName := range c.resources.Pools { @@ -173,29 +241,64 @@ func (c *Controller) Update(resources *DriverResources) { } } +// GetStats provides some insights into operations of the controller. +func (c *Controller) GetStats() Stats { + s := Stats{ + NumCreates: atomic.LoadInt64(&c.numCreates), + NumUpdates: atomic.LoadInt64(&c.numUpdates), + NumDeletes: atomic.LoadInt64(&c.numDeletes), + } + return s +} + +type Stats struct { + // NumCreates counts the number of ResourceSlices that got created. + NumCreates int64 + // NumUpdates counts the number of ResourceSlices that got update. + NumUpdates int64 + // NumDeletes counts the number of ResourceSlices that got deleted. + NumDeletes int64 +} + // newController creates a new controller. -func newController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) { - if kubeClient == nil { - return nil, fmt.Errorf("kubeClient is nil") +func newController(ctx context.Context, options Options) (*Controller, error) { + if options.KubeClient == nil { + return nil, errors.New("KubeClient is nil") + } + if options.DriverName == "" { + return nil, errors.New("DRA driver name is empty") + } + if options.Resources == nil { + return nil, errors.New("DriverResources are nil") } ctx, cancel := context.WithCancelCause(ctx) c := &Controller{ - cancel: cancel, - kubeClient: kubeClient, - driver: driver, - owner: owner, - queue: workqueue.NewTypedRateLimitingQueueWithConfig( + cancel: cancel, + kubeClient: options.KubeClient, + driverName: options.DriverName, + owner: options.Owner.DeepCopy(), + queue: options.Queue, + resources: options.Resources.DeepCopy(), + mutationCacheTTL: ptr.Deref(options.MutationCacheTTL, defaultMutationCacheTTL), + syncDelay: ptr.Deref(options.SyncDelay, defaultSyncDelay), + } + if c.queue == nil { + c.queue = workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"}, - ), - resources: resources, + ) } - if err := c.initInformer(ctx); err != nil { return nil, err } + + // Sync each desired pool once. + for poolName := range options.Resources.Pools { + c.queue.Add(poolName) + } + return c, nil } @@ -205,10 +308,10 @@ func (c *Controller) initInformer(ctx context.Context) error { // We always filter by driver name, by node name only for node-local resources. selector := fields.Set{ - resourceapi.ResourceSliceSelectorDriver: c.driver, + resourceapi.ResourceSliceSelectorDriver: c.driverName, resourceapi.ResourceSliceSelectorNodeName: "", } - if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { + if c.owner != nil && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name } informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, cache.Indexers{ @@ -222,7 +325,7 @@ func (c *Controller) initInformer(ctx context.Context) error { }, func(options *metav1.ListOptions) { options.FieldSelector = selector.String() }) - c.sliceStore = informer.GetIndexer() + c.sliceStore = cache.NewIntegerResourceVersionMutationCache(informer.GetStore(), informer.GetIndexer(), c.mutationCacheTTL, true /* includeAdds */) handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { slice, ok := obj.(*resourceapi.ResourceSlice) @@ -230,7 +333,7 @@ func (c *Controller) initInformer(ctx context.Context) error { return } logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice)) - c.queue.Add(slice.Spec.Pool.Name) + c.queue.AddAfter(slice.Spec.Pool.Name, c.syncDelay) }, UpdateFunc: func(old, new any) { oldSlice, ok := old.(*resourceapi.ResourceSlice) @@ -246,8 +349,8 @@ func (c *Controller) initInformer(ctx context.Context) error { } else { logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice)) } - c.queue.Add(oldSlice.Spec.Pool.Name) - c.queue.Add(newSlice.Spec.Pool.Name) + c.queue.AddAfter(oldSlice.Spec.Pool.Name, c.syncDelay) + c.queue.AddAfter(newSlice.Spec.Pool.Name, c.syncDelay) }, DeleteFunc: func(obj any) { if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { @@ -258,7 +361,7 @@ func (c *Controller) initInformer(ctx context.Context) error { return } logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice)) - c.queue.Add(slice.Spec.Pool.Name) + c.queue.AddAfter(slice.Spec.Pool.Name, c.syncDelay) }, }) if err != nil { @@ -348,7 +451,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // The result gets cached and is expected to not change while // the controller runs. var nodeName string - if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { + if c.owner != nil && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { nodeName = c.owner.Name if c.owner.UID == "" { node, err := c.kubeClient.CoreV1().Nodes().Get(ctx, c.owner.Name, metav1.GetOptions{}) @@ -360,8 +463,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { } } - // Slices that don't match any driver resource can either be updated (if there - // are new driver resources that need to be stored) or they need to be deleted. + // Slices that don't match any driver slice need to be deleted. obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices)) // Determine highest generation. @@ -381,92 +483,233 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { currentSlices = append(currentSlices, slice) } } - slices = currentSlices - - // Sort by name to ensure that keeping only the first slice is deterministic. - sort.Slice(slices, func(i, j int) bool { - return slices[i].Name < slices[j].Name - }) + logger.V(5).Info("Existing slices", "obsolete", klog.KObjSlice(obsoleteSlices), "current", klog.KObjSlice(currentSlices)) if pool, ok := resources.Pools[poolName]; ok { - if pool.Generation > generation { - generation = pool.Generation - } - - // Right now all devices get published in a single slice. - // We simply pick the first one, if there is one, and copy - // it in preparation for updating it. + // Match each existing slice against the desired slices. + // Two slices match if they contain exactly the same + // device IDs, in an arbitrary order. Such a matched + // slice gets updated with the desired content if + // there is a difference. // - // TODO: support splitting across slices, with unit tests. - if len(slices) > 0 { - obsoleteSlices = append(obsoleteSlices, slices[1:]...) - slices = []*resourceapi.ResourceSlice{slices[0].DeepCopy()} - } else { - slices = []*resourceapi.ResourceSlice{ - { - ObjectMeta: metav1.ObjectMeta{ - GenerateName: c.owner.Name + "-" + c.driver + "-", - }, - }, + // This supports updating the definition of devices + // in a slice. Adding or removing devices is done + // by deleting the old slice and creating a new one. + // + // This is primarily a simplification of the code: + // to support adding or removing devices from + // existing slices, we would have to identify "most + // similar" slices (= minimal editing distance). + // + // In currentSliceForDesiredSlice we keep track of + // which desired slice has a matched slice. + // + // At the end of the loop, each current slice is either + // a match or obsolete. + currentSliceForDesiredSlice := make(map[int]*resourceapi.ResourceSlice, len(pool.Slices)) + for _, currentSlice := range currentSlices { + matched := false + for i := range pool.Slices { + if _, ok := currentSliceForDesiredSlice[i]; ok { + // Already has a match. + continue + } + if sameSlice(currentSlice, &pool.Slices[i]) { + currentSliceForDesiredSlice[i] = currentSlice + logger.V(5).Info("Matched existing slice", "slice", klog.KObj(currentSlice), "matchIndex", i) + matched = true + break + } + } + if !matched { + obsoleteSlices = append(obsoleteSlices, currentSlice) + logger.V(5).Info("Unmatched existing slice", "slice", klog.KObj(currentSlice)) } } - slice := slices[0] - slice.OwnerReferences = []metav1.OwnerReference{{ - APIVersion: c.owner.APIVersion, - Kind: c.owner.Kind, - Name: c.owner.Name, - UID: c.owner.UID, - Controller: ptr.To(true), - }} - slice.Spec.Driver = c.driver - slice.Spec.Pool.Name = poolName - slice.Spec.Pool.Generation = generation - slice.Spec.Pool.ResourceSliceCount = 1 - slice.Spec.NodeName = nodeName - slice.Spec.NodeSelector = pool.NodeSelector - slice.Spec.AllNodes = pool.NodeSelector == nil && nodeName == "" - slice.Spec.Devices = pool.Devices + // Desired metadata which must be set in each slice. + resourceSliceCount := len(pool.Slices) + numMatchedSlices := len(currentSliceForDesiredSlice) + numNewSlices := resourceSliceCount - numMatchedSlices + desiredPool := resourceapi.ResourcePool{ + Name: poolName, + Generation: generation, // May get updated later. + ResourceSliceCount: int64(resourceSliceCount), + } + desiredAllNodes := pool.NodeSelector == nil && nodeName == "" - if loggerV := logger.V(6); loggerV.Enabled() { - // Dump entire resource information. - loggerV.Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "pool", pool) - } else { - logger.V(5).Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices)) + // Now for each desired slice, figure out which of them are changed. + changedDesiredSlices := sets.New[int]() + for i, currentSlice := range currentSliceForDesiredSlice { + // Reordering entries is a difference and causes an update even if the + // entries are the same. + if !apiequality.Semantic.DeepEqual(¤tSlice.Spec.Pool, &desiredPool) || + !apiequality.Semantic.DeepEqual(currentSlice.Spec.NodeSelector, pool.NodeSelector) || + currentSlice.Spec.AllNodes != desiredAllNodes || + !apiequality.Semantic.DeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) { + changedDesiredSlices.Insert(i) + logger.V(5).Info("Need to update slice", "slice", klog.KObj(currentSlice), "matchIndex", i) + } + } + logger.V(5).Info("Completed comparison", + "numObsolete", len(obsoleteSlices), + "numMatchedSlices", len(currentSliceForDesiredSlice), + "numChangedMatchedSlices", len(changedDesiredSlices), + "numNewSlices", numNewSlices, + ) + + bumpedGeneration := false + switch { + case pool.Generation > generation: + // Bump up the generation if the driver asked for it, or + // start with a non-zero generation. + generation = pool.Generation + bumpedGeneration = true + logger.V(5).Info("Bumped generation to driver-provided generation", "generation", generation) + case numNewSlices == 0 && len(changedDesiredSlices) <= 1: + logger.V(5).Info("Kept generation because at most one update API call is necessary", "generation", generation) + default: + generation++ + bumpedGeneration = true + logger.V(5).Info("Bumped generation by one", "generation", generation) + } + desiredPool.Generation = generation + + // Update existing slices. + for i, currentSlice := range currentSliceForDesiredSlice { + if !changedDesiredSlices.Has(i) && !bumpedGeneration { + continue + } + slice := currentSlice.DeepCopy() + slice.Spec.Pool = desiredPool + // No need to set the node name. If it was different, we wouldn't + // have listed the existing slice. + slice.Spec.NodeSelector = pool.NodeSelector + slice.Spec.AllNodes = desiredAllNodes + slice.Spec.Devices = pool.Slices[i].Devices + + logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) + slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("update resource slice: %w", err) + } + atomic.AddInt64(&c.numUpdates, 1) + c.sliceStore.Mutation(slice) + } + + // Create new slices. + added := false + for i := 0; i < len(pool.Slices); i++ { + if _, ok := currentSliceForDesiredSlice[i]; ok { + // Was handled above through an update. + continue + } + var ownerReferences []metav1.OwnerReference + if c.owner != nil { + ownerReferences = append(ownerReferences, + metav1.OwnerReference{ + APIVersion: c.owner.APIVersion, + Kind: c.owner.Kind, + Name: c.owner.Name, + UID: c.owner.UID, + Controller: ptr.To(true), + }, + ) + } + generateName := c.driverName + "-" + if c.owner != nil { + generateName = c.owner.Name + "-" + generateName + } + slice := &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences, + GenerateName: generateName, + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: c.driverName, + Pool: desiredPool, + NodeName: nodeName, + NodeSelector: pool.NodeSelector, + AllNodes: desiredAllNodes, + Devices: pool.Slices[i].Devices, + }, + } + + // It can happen that we create a missing slice, some + // other change than the create causes another sync of + // the pool, and then a second slice for the same set + // of devices would get created because the controller has + // no copy of the first slice instance in its informer + // cache yet. + // + // Using a https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache + // avoids that. + logger.V(5).Info("Creating new resource slice") + slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("create resource slice: %w", err) + } + atomic.AddInt64(&c.numCreates, 1) + c.sliceStore.Mutation(slice) + added = true + } + if added { + // Check that the recently added slice(s) really exist even + // after they expired from the mutation cache. + c.queue.AddAfter(poolName, c.mutationCacheTTL) } } else if len(slices) > 0 { // All are obsolete, pool does not exist anymore. - - logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices)) - obsoleteSlices = append(obsoleteSlices, slices...) - // No need to create or update the slices. - slices = nil + obsoleteSlices = slices + logger.V(5).Info("Removing resource slices after pool removal") } // Remove stale slices. for _, slice := range obsoleteSlices { - logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice)) - if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + options := metav1.DeleteOptions{ + Preconditions: &metav1.Preconditions{ + UID: &slice.UID, + ResourceVersion: &slice.ResourceVersion, + }, + } + // It can happen that we sync again shortly after deleting a + // slice and before the slice gets removed from the informer + // cache. The MutationCache can't help here because it does not + // track pending deletes. + // + // If this happens, we get a "not found error" and nothing + // changes on the server. The only downside is the extra API + // call. This isn't as bad as extra creates. + logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice), "deleteOptions", options) + err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, options) + switch { + case err == nil: + atomic.AddInt64(&c.numDeletes, 1) + case apierrors.IsNotFound(err): + logger.V(5).Info("Resource slice was already deleted earlier", "slice", klog.KObj(slice)) + default: return fmt.Errorf("delete resource slice: %w", err) } } - // Create or update slices. - for _, slice := range slices { - if slice.UID == "" { - logger.V(5).Info("Creating new resource slice", "slice", klog.KObj(slice)) - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("create resource slice: %w", err) - } - continue - } - - // TODO: switch to SSA once unit testing supports it. - logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("update resource slice: %w", err) - } - } - return nil } + +func sameSlice(existingSlice *resourceapi.ResourceSlice, desiredSlice *Slice) bool { + if len(existingSlice.Spec.Devices) != len(desiredSlice.Devices) { + return false + } + + existingDevices := sets.New[string]() + for _, device := range existingSlice.Spec.Devices { + existingDevices.Insert(device.Name) + } + for _, device := range desiredSlice.Devices { + if !existingDevices.Has(device.Name) { + return false + } + } + + // Same number of devices, names all present -> equal. + return true +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go index 1ba33da54c5..dd461d35251 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go @@ -17,7 +17,12 @@ limitations under the License. package resourceslice import ( + "fmt" + "sort" + "strconv" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,243 +33,820 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/dynamic-resource-allocation/internal/workqueue" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" "k8s.io/utils/ptr" ) +func init() { + klog.InitFlags(nil) +} + +// TestControllerSyncPool verifies that syncPool produces the right ResourceSlices. +// Update vs. Create API calls are checked by bumping the ResourceVersion in +// updates. func TestControllerSyncPool(t *testing.T) { var ( - node = "node" + ownerName = "owner" nodeUID = types.UID("node-uid") - driveName = "driver" + driverName = "driver" poolName = "pool" - poolName1 = "pool-1" deviceName = "device" + deviceName1 = "device-1" + deviceName2 = "device-2" + deviceName3 = "device-3" + deviceName4 = "device-4" resourceSlice1 = "resource-slice-1" resourceSlice2 = "resource-slice-2" resourceSlice3 = "resource-slice-3" + generateName = ownerName + "-" + driverName + "-" + generatedName1 = generateName + "0" + basicDevice = &resourceapi.BasicDevice{ + Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "new-attribute": {StringValue: ptr.To("value")}, + }, + } + nodeSelector = &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{ + MatchFields: []v1.NodeSelectorRequirement{{ + Key: "name", + Values: []string{"node-a"}, + }}, + }}, + } + otherNodeSelector = &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{ + MatchFields: []v1.NodeSelectorRequirement{{ + Key: "name", + Values: []string{"node-b"}, + }}, + }}, + } ) testCases := map[string]struct { - nodeUID types.UID + syncDelay *time.Duration + // nodeUID is empty if not a node-local. + nodeUID types.UID + // noOwner completely disables setting an owner. + noOwner bool + // initialObjects is a list of initial resource slices to be used in the test. initialObjects []runtime.Object - poolName string + initialOtherObjects []runtime.Object inputDriverResources *DriverResources expectedResourceSlices []resourceapi.ResourceSlice + expectedStats Stats }{ - "single-resourceslice-existing": { - nodeUID: nodeUID, - initialObjects: []runtime.Object{ - newResourceSlice(resourceSlice1, node, driveName, poolName, 1), - }, - poolName: poolName, - inputDriverResources: &DriverResources{ - Pools: map[string]Pool{ - poolName: { - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, - }, - }, - }, - expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), - }, - }, - "single-resourceslice-existing-with-nodeUID-empty": { - nodeUID: "", - initialObjects: []runtime.Object{ - newNode(node, nodeUID), - newResourceSlice(resourceSlice1, node, driveName, poolName, 1), - }, - poolName: poolName, - inputDriverResources: &DriverResources{ - Pools: map[string]Pool{ - poolName: { - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, - }, - }, - }, - expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), - }, - }, - "multiple-resourceslices-existing": { - nodeUID: nodeUID, - initialObjects: []runtime.Object{ - newResourceSlice(resourceSlice1, node, driveName, poolName, 1), - newResourceSlice(resourceSlice2, node, driveName, poolName, 1), - newResourceSlice(resourceSlice3, node, driveName, poolName, 1), - }, - poolName: poolName, - inputDriverResources: &DriverResources{ - Pools: map[string]Pool{ - poolName: { - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, - }, - }, - }, - expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), - }, - }, - "no-resourceslices-existing": { + "create-slice": { nodeUID: nodeUID, initialObjects: []runtime.Object{}, - poolName: poolName, inputDriverResources: &DriverResources{ Pools: map[string]Pool{ poolName: { - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, + Slices: []Slice{{Devices: []resourceapi.Device{}}}, }, }, }, + expectedStats: Stats{ + NumCreates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(node+"-"+driveName+"-", node, string(nodeUID), driveName, poolName, deviceName, true, 0), + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), }, }, - "single-resourceslice-existing-with-different-resource-pool-generation": { + "keep-slice-unchanged": { nodeUID: nodeUID, initialObjects: []runtime.Object{ - newResourceSlice(resourceSlice1, node, driveName, poolName, 2), - newResourceSlice(resourceSlice2, node, driveName, poolName, 1), - newResourceSlice(resourceSlice3, node, driveName, poolName, 1), + MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), }, - poolName: poolName, inputDriverResources: &DriverResources{ Pools: map[string]Pool{ poolName: { - Generation: 3, - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, - }, - }, - }, - expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 3), - }, - }, - "single-resourceslice-existing-with-mismatching-resource-pool-name": { - nodeUID: nodeUID, - initialObjects: []runtime.Object{ - newResourceSlice(resourceSlice1, node, driveName, poolName, 1), - }, - poolName: poolName, - inputDriverResources: &DriverResources{ - Pools: map[string]Pool{ - poolName1: { Generation: 1, - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}, }, }, }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "remove-pool": { + nodeUID: nodeUID, + syncDelay: ptr.To(time.Duration(0)), + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{}, + expectedStats: Stats{ + NumDeletes: 1, + }, expectedResourceSlices: nil, }, + "delete-and-add-slice": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + // no devices + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}}, + }, + }, + expectedStats: Stats{ + NumDeletes: 1, + NumCreates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 1}).Obj(), + }, + }, + "delete-redundant-slice": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}}, + }, + }, + expectedStats: Stats{ + NumDeletes: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "update-slice": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{ + Devices: []resourceapi.Device{{ + Name: deviceName, + Basic: basicDevice, + }}, + }}, + }, + }, + }, + expectedStats: Stats{ + NumUpdates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{ + Name: deviceName, + Basic: basicDevice, + }}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "update-slice-many-devices": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}, {Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{ + Devices: []resourceapi.Device{ + {Name: deviceName1}, + { + Name: deviceName2, + Basic: basicDevice, + }, + }, + }}, + }, + }, + }, + expectedStats: Stats{ + NumUpdates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{ + {Name: deviceName1}, + { + Name: deviceName2, + Basic: basicDevice, + }}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "multiple-resourceslices-existing-no-changes": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2}}}, + {Devices: []resourceapi.Device{{Name: deviceName3}}}, + }, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + }, + "multiple-resourceslices-existing-with-different-resource-pool-generation": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + // no devices + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + // matching device + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 1}).Obj(), + // no devices + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{ + Devices: []resourceapi.Device{ + { + Name: deviceName, + }, + }, + }}, + }, + }, + }, + expectedStats: Stats{ + NumDeletes: 2, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 1}).Obj(), + }, + }, + "multiple-resourceslices-existing-one-changed": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2, Basic: basicDevice}}}, + {Devices: []resourceapi.Device{{Name: deviceName3}}}, + }, + }, + }, + }, + // Generation not bumped, only one update. + expectedStats: Stats{ + NumUpdates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2, Basic: basicDevice}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + }, + "multiple-resourceslices-existing-two-changed": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2, Basic: basicDevice}}}, + {Devices: []resourceapi.Device{{Name: deviceName3, Basic: basicDevice}}}, + }, + }, + }, + }, + // Generation bumped, all updated. + expectedStats: Stats{ + NumUpdates: 3, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).ResourceVersion("1"). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2, Basic: basicDevice}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).ResourceVersion("1"). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3, Basic: basicDevice}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 3}).Obj(), + }, + }, + "multiple-resourceslices-existing-one-removed": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2}}}, + }, + }, + }, + }, + // Generation bumped, two updated, one removed. + expectedStats: Stats{ + NumUpdates: 2, + NumDeletes: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).ResourceVersion("1"). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 2}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 2}).Obj(), + }, + }, + "multiple-resourceslices-existing-one-added": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2}}}, + {Devices: []resourceapi.Device{{Name: deviceName3}}}, + {Devices: []resourceapi.Device{{Name: deviceName4}}}, + }, + }, + }, + }, + expectedStats: Stats{ + NumUpdates: 3, + NumCreates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 4}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 4}).Obj(), + *MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 4}).Obj(), + *MakeResourceSlice().GenerateName(generateName).Name(generatedName1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName4}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 4}).Obj(), + }, + }, + "add-one-network-device-all-nodes": { + initialObjects: []runtime.Object{}, + noOwner: true, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}, + }, + }, + }, + expectedStats: Stats{ + NumCreates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(driverName + "-0").GenerateName(driverName + "-"). + AllNodes(true). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "add-one-network-device-some-nodes": { + initialObjects: []runtime.Object{}, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + NodeSelector: nodeSelector, + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}, + }, + }, + }, + expectedStats: Stats{ + NumCreates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + AppOwnerReferences(ownerName).NodeSelector(nodeSelector). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "update-node-selector": { + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + AppOwnerReferences(ownerName).NodeSelector(nodeSelector). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + NodeSelector: otherNodeSelector, + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}, + }, + }, + }, + expectedStats: Stats{ + NumUpdates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). + AppOwnerReferences(ownerName).NodeSelector(otherNodeSelector). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, } for name, test := range testCases { t.Run(name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) - kubeClient := fake.NewSimpleClientset(test.initialObjects...) - owner := Owner{ + inputObjects := make([]runtime.Object, 0, len(test.initialObjects)+len(test.initialOtherObjects)) + for _, initialOtherObject := range test.initialOtherObjects { + inputObjects = append(inputObjects, initialOtherObject.DeepCopyObject()) + } + for _, initialObject := range test.initialObjects { + if _, ok := initialObject.(*resourceapi.ResourceSlice); !ok { + t.Fatalf("test.initialObjects have to be of type *resourceapi.ResourceSlice") + } + inputObjects = append(inputObjects, initialObject.DeepCopyObject()) + } + kubeClient := createTestClient(inputObjects...) + var queue workqueue.Mock[string] + owner := &Owner{ APIVersion: "v1", Kind: "Node", - Name: node, + Name: ownerName, UID: test.nodeUID, } - ctrl, err := newController(ctx, kubeClient, driveName, owner, test.inputDriverResources) + if test.nodeUID == "" { + owner = &Owner{ + APIVersion: "apps/v1", + Kind: "Something", + Name: ownerName, + } + } + if test.noOwner { + owner = nil + } + ctrl, err := newController(ctx, Options{ + DriverName: driverName, + KubeClient: kubeClient, + Owner: owner, + Resources: test.inputDriverResources, + Queue: &queue, + SyncDelay: test.syncDelay, + }) defer ctrl.Stop() require.NoError(t, err, "unexpected controller creation error") - err = ctrl.syncPool(ctx, test.poolName) - require.NoError(t, err, "unexpected syncPool error") + + // Process work items in the queue until the queue is empty. + // Processing races with informers adding new work items, + // but the desired state should already be reached in the + // first iteration, so all following iterations should be nops. + ctrl.run(ctx) // Check ResourceSlices resourceSlices, err := kubeClient.ResourceV1alpha3().ResourceSlices().List(ctx, metav1.ListOptions{}) require.NoError(t, err, "list resource slices") - assert.Equal(t, test.expectedResourceSlices, resourceSlices.Items, "unexpected ResourceSlices") + + sortResourceSlices(test.expectedResourceSlices) + sortResourceSlices(resourceSlices.Items) + assert.Equal(t, test.expectedResourceSlices, resourceSlices.Items) + + assert.Equal(t, test.expectedStats, ctrl.GetStats()) + + // The informer might have added a work item before or after ctrl.run returned, + // therefore we cannot compare the `Later` field. It's either defaultMutationCacheTTL + // (last AddAfter call was after a create) or defaultSyncDelay (last AddAfter was + // from informer event handler). + actualState := queue.State() + actualState.Later = nil + var expectState workqueue.MockState[string] + assert.Equal(t, expectState, actualState) }) } } -func newNode(name string, uid types.UID) *v1.Node { - return &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: uid, - }, +func sortResourceSlices(slices []resourceapi.ResourceSlice) { + sort.Slice(slices, func(i, j int) bool { + if len(slices[i].Name) == 0 && len(slices[j].Name) == 0 { + return slices[i].ObjectMeta.GenerateName < slices[j].ObjectMeta.GenerateName + } + return slices[i].Name < slices[j].Name + }) +} + +func createTestClient(objects ...runtime.Object) *fake.Clientset { + fakeClient := fake.NewSimpleClientset(objects...) + fakeClient.PrependReactor("create", "resourceslices", createResourceSliceCreateReactor()) + fakeClient.PrependReactor("update", "resourceslices", resourceSliceUpdateReactor) + return fakeClient +} + +// createResourceSliceCreateReactor returns a function which +// implements the logic required for the GenerateName field to work when using +// the fake client. Add it with client.PrependReactor to your fake client. +func createResourceSliceCreateReactor() func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + nameCounter := 0 + var mutex sync.Mutex + return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + mutex.Lock() + defer mutex.Unlock() + resourceslice := action.(k8stesting.CreateAction).GetObject().(*resourceapi.ResourceSlice) + if resourceslice.Name == "" && resourceslice.GenerateName != "" { + resourceslice.Name = fmt.Sprintf("%s%d", resourceslice.GenerateName, nameCounter) + } + nameCounter++ + return false, nil, nil } } -func newResourceSlice(name, nodeName, driveName, poolName string, poolGeneration int64) *resourceapi.ResourceSlice { - return &resourceapi.ResourceSlice{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(name), - }, - Spec: resourceapi.ResourceSliceSpec{ - NodeName: nodeName, - Driver: driveName, - Pool: resourceapi.ResourcePool{ - Name: poolName, - ResourceSliceCount: 1, - Generation: poolGeneration, +// resourceSliceUpdateReactor implements the ResourceVersion bump for a fake client. +func resourceSliceUpdateReactor(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + resourceslice := action.(k8stesting.UpdateAction).GetObject().(*resourceapi.ResourceSlice) + rev := 0 + if resourceslice.ResourceVersion != "" { + oldRev, err := strconv.Atoi(resourceslice.ResourceVersion) + if err != nil { + return false, nil, fmt.Errorf("ResourceVersion %q should have been an int: %w", resourceslice.ResourceVersion, err) + } + rev = oldRev + } + rev++ + resourceslice.ResourceVersion = fmt.Sprintf("%d", rev) + return false, nil, nil +} + +// ResourceSliceWrapper wraps a ResourceSlice. +type ResourceSliceWrapper struct { + resourceapi.ResourceSlice +} + +// MakeResourceSlice creates a wrapper for a ResourceSlice. +func MakeResourceSlice() *ResourceSliceWrapper { + return &ResourceSliceWrapper{ + resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{}, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{}, + Devices: []resourceapi.Device{}, }, }, } } -func newExpectResourceSlice(name, nodeName, nodeUID, driveName, poolName, deviceName string, generateName bool, poolGeneration int64) *resourceapi.ResourceSlice { - resourceSlice := &resourceapi.ResourceSlice{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Node", - Name: nodeName, - UID: types.UID(nodeUID), - Controller: ptr.To(true), - }, - }, - }, - Spec: resourceapi.ResourceSliceSpec{ - NodeName: nodeName, - Driver: driveName, - Pool: resourceapi.ResourcePool{ - Name: poolName, - ResourceSliceCount: 1, - Generation: poolGeneration, - }, - Devices: []resourceapi.Device{{Name: deviceName}}, - }, - } - - if generateName { - resourceSlice.ObjectMeta.GenerateName = name - } else { - resourceSlice.ObjectMeta.Name = name - resourceSlice.ObjectMeta.UID = types.UID(name) - } - return resourceSlice +// Obj returns the inner ResourceSlice. +func (r *ResourceSliceWrapper) Obj() *resourceapi.ResourceSlice { + return &r.ResourceSlice +} + +// Name sets the value of ResourceSlice.ObjectMeta.Name +func (r *ResourceSliceWrapper) Name(name string) *ResourceSliceWrapper { + r.ObjectMeta.Name = name + return r +} + +// GenerateName sets the value of ResourceSlice.ObjectMeta.GenerateName +func (r *ResourceSliceWrapper) GenerateName(generateName string) *ResourceSliceWrapper { + r.ObjectMeta.GenerateName = generateName + return r +} + +// UID sets the value of ResourceSlice.ObjectMeta.UID +func (r *ResourceSliceWrapper) UID(uid string) *ResourceSliceWrapper { + r.ObjectMeta.UID = types.UID(uid) + return r +} + +// ResourceVersion sets the value of ResourceSlice.ObjectMeta.ResourceVersion +func (r *ResourceSliceWrapper) ResourceVersion(rev string) *ResourceSliceWrapper { + r.ObjectMeta.ResourceVersion = rev + return r +} + +// NodeOwnerReferences sets the value of ResourceSlice.ObjectMeta.NodeOwnerReferences +// to a v1.Node +func (r *ResourceSliceWrapper) NodeOwnerReferences(nodeName, nodeUID string) *ResourceSliceWrapper { + r.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Node", + Name: nodeName, + UID: types.UID(nodeUID), + Controller: ptr.To(true), + }, + } + return r +} + +// AppOwnerReferences sets the value of ResourceSlice.ObjectMeta.NodeOwnerReferences +// to some fictional app controller resource +func (r *ResourceSliceWrapper) AppOwnerReferences(appName string) *ResourceSliceWrapper { + r.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Something", + Name: appName, + Controller: ptr.To(true), + }, + } + return r +} + +// Driver sets the value of ResourceSlice.Spec.Driver +func (r *ResourceSliceWrapper) Driver(driver string) *ResourceSliceWrapper { + r.Spec.Driver = driver + return r +} + +// Pool sets the value of ResourceSlice.Spec.Pool +func (r *ResourceSliceWrapper) Pool(pool resourceapi.ResourcePool) *ResourceSliceWrapper { + r.Spec.Pool = pool + return r +} + +// NodeName sets the value of ResourceSlice.Spec.NodeName +func (r *ResourceSliceWrapper) NodeName(nodeName string) *ResourceSliceWrapper { + r.Spec.NodeName = nodeName + return r +} + +// NodeSelector sets the value of ResourceSlice.Spec.NodeSelector +func (r *ResourceSliceWrapper) NodeSelector(nodeSelector *v1.NodeSelector) *ResourceSliceWrapper { + r.Spec.NodeSelector = nodeSelector + return r +} + +// AllNodes sets the value of ResourceSlice.Spec.AllNodes +func (r *ResourceSliceWrapper) AllNodes(allNodes bool) *ResourceSliceWrapper { + r.Spec.AllNodes = allNodes + return r +} + +// Devices sets the value of ResourceSlice.Spec.Devices +func (r *ResourceSliceWrapper) Devices(devices []resourceapi.Device) *ResourceSliceWrapper { + r.Spec.Devices = devices + return r } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/zz_generated.deepcopy.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/zz_generated.deepcopy.go index 82d68f3ae0f..635e89fcc1b 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/zz_generated.deepcopy.go @@ -73,9 +73,9 @@ func (in *Pool) DeepCopyInto(out *Pool) { *out = new(v1.NodeSelector) (*in).DeepCopyInto(*out) } - if in.Devices != nil { - in, out := &in.Devices, &out.Devices - *out = make([]v1alpha3.Device, len(*in)) + if in.Slices != nil { + in, out := &in.Slices, &out.Slices + *out = make([]Slice, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -92,3 +92,26 @@ func (in *Pool) DeepCopy() *Pool { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Slice) DeepCopyInto(out *Slice) { + *out = *in + if in.Devices != nil { + in, out := &in.Devices, &out.Devices + *out = make([]v1alpha3.Device, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Slice. +func (in *Slice) DeepCopy() *Slice { + if in == nil { + return nil + } + out := new(Slice) + in.DeepCopyInto(out) + return out +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index 7ab3fece1d6..dc19c8aab18 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -198,6 +198,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] if pool.IsIncomplete { return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently being updated", klog.KObj(claim), request.Name, pool.PoolID) } + if pool.IsInvalid { + return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently invalid", klog.KObj(claim), request.Name, pool.PoolID) + } for _, slice := range pool.Slices { for deviceIndex := range slice.Spec.Devices { @@ -599,6 +602,13 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { continue } + // If the pool is not valid, then fail now. It's okay when pools of one driver + // are invalid if we allocate from some other pool, but it's not safe to + // allocated from an invalid pool. + if pool.IsInvalid { + return false, fmt.Errorf("pool %s is invalid: %s", pool.Pool, pool.InvalidReason) + } + // Finally treat as allocated and move on to the next device. allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false) if err != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index 40a6b7561db..f16c38c9cee 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -535,6 +535,28 @@ func TestAllocator(t *testing.T) { deviceAllocationResult(req0, driverA, pool1, device1, false), )}, }, + "duplicate-slice": { + claimsToAllocate: objects(claim(claim0, req0, classA)), + classes: objects(class(classA, driverA)), + slices: func() []*resourceapi.ResourceSlice { + // This simulates the problem that can + // (theoretically) occur when the resource + // slice controller wants to publish a pool + // with two slices but ends up creating some + // identical slices under different names + // because its informer cache was out-dated on + // another sync (see + // resourceslicecontroller.go). + sliceA := sliceWithOneDevice(slice1, node1, pool1, driverA) + sliceA.Spec.Pool.ResourceSliceCount = 2 + sliceB := sliceA.DeepCopy() + sliceB.Name += "-2" + return []*resourceapi.ResourceSlice{sliceA, sliceB} + }(), + node: node(node1, region1), + + expectError: gomega.MatchError(gomega.ContainSubstring(fmt.Sprintf("pool %s is invalid: duplicate device name %s", pool1, device1))), + }, "no-slices": { claimsToAllocate: objects(claim(claim0, req0, classA)), classes: objects(class(classA, driverA)), diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go index 4c1f8e68bbb..64bb46c7b8e 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" ) @@ -30,8 +31,9 @@ import ( // GatherPools collects information about all resource pools which provide // devices that are accessible from the given node. // -// Out-dated slices are silently ignored. Pools may be incomplete, which is -// recorded in the result. +// Out-dated slices are silently ignored. Pools may be incomplete (not all +// required slices available) or invalid (for example, device names not unique). +// Both is recorded in the result. func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) { pools := make(map[PoolID]*Pool) @@ -75,6 +77,7 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL result := make([]*Pool, 0, len(pools)) for _, pool := range pools { pool.IsIncomplete = int64(len(pool.Slices)) != pool.Slices[0].Spec.Pool.ResourceSliceCount + pool.IsInvalid, pool.InvalidReason = poolIsInvalid(pool) result = append(result, pool) } @@ -101,17 +104,32 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) { if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation { // Newer, replaces all old slices. - pool.Slices = []*resourceapi.ResourceSlice{slice} + pool.Slices = nil } // Add to pool. pool.Slices = append(pool.Slices, slice) } +func poolIsInvalid(pool *Pool) (bool, string) { + devices := sets.New[string]() + for _, slice := range pool.Slices { + for _, device := range slice.Spec.Devices { + if devices.Has(device.Name) { + return true, fmt.Sprintf("duplicate device name %s", device.Name) + } + devices.Insert(device.Name) + } + } + return false, "" +} + type Pool struct { PoolID - IsIncomplete bool - Slices []*resourceapi.ResourceSlice + IsIncomplete bool + IsInvalid bool + InvalidReason string + Slices []*resourceapi.ResourceSlice } type PoolID struct { diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 5b2fa693354..a183675dfec 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -40,8 +40,10 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation" applyv1 "k8s.io/client-go/applyconfigurations/core/v1" "k8s.io/client-go/kubernetes" + "k8s.io/dynamic-resource-allocation/resourceslice" "k8s.io/klog/v2" "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" @@ -729,6 +731,97 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, // TODO (https://github.com/kubernetes/kubernetes/issues/123699): move most of the test below into `testDriver` so that they get // executed with different parameters. + ginkgo.Context("ResourceSlice Controller", func() { + // This is a stress test for creating many large slices. + // Each slice is as large as API limits allow. + // + // Could become a conformance test because it only depends + // on the apiserver. + f.It("creates slices", func(ctx context.Context) { + // Define desired resource slices. + driverName := f.Namespace.Name + numSlices := 100 + devicePrefix := "dev-" + domainSuffix := ".example.com" + poolName := "network-attached" + domain := strings.Repeat("x", 63 /* TODO(pohly): add to API */ -len(domainSuffix)) + domainSuffix + stringValue := strings.Repeat("v", resourceapi.DeviceAttributeMaxValueLength) + pool := resourceslice.Pool{ + Slices: make([]resourceslice.Slice, numSlices), + } + for i := 0; i < numSlices; i++ { + devices := make([]resourceapi.Device, resourceapi.ResourceSliceMaxDevices) + for e := 0; e < resourceapi.ResourceSliceMaxDevices; e++ { + device := resourceapi.Device{ + Name: devicePrefix + strings.Repeat("x", validation.DNS1035LabelMaxLength-len(devicePrefix)-4) + fmt.Sprintf("%04d", e), + Basic: &resourceapi.BasicDevice{ + Attributes: make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice), + }, + } + for j := 0; j < resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice; j++ { + name := resourceapi.QualifiedName(domain + "/" + strings.Repeat("x", resourceapi.DeviceMaxIDLength-4) + fmt.Sprintf("%04d", j)) + device.Basic.Attributes[name] = resourceapi.DeviceAttribute{ + StringValue: &stringValue, + } + } + devices[e] = device + } + pool.Slices[i].Devices = devices + } + resources := &resourceslice.DriverResources{ + Pools: map[string]resourceslice.Pool{poolName: pool}, + } + + ginkgo.By("Creating slices") + mutationCacheTTL := 10 * time.Second + controller, err := resourceslice.StartController(ctx, resourceslice.Options{ + DriverName: driverName, + KubeClient: f.ClientSet, + Resources: resources, + MutationCacheTTL: &mutationCacheTTL, + }) + framework.ExpectNoError(err, "start controller") + ginkgo.DeferCleanup(func(ctx context.Context) { + controller.Stop() + err := f.ClientSet.ResourceV1alpha3().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName, + }) + framework.ExpectNoError(err, "delete resource slices") + }) + + // Eventually we should have all desired slices. + listSlices := framework.ListObjects(f.ClientSet.ResourceV1alpha3().ResourceSlices().List, metav1.ListOptions{ + FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName, + }) + gomega.Eventually(ctx, listSlices).WithTimeout(time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(numSlices))) + + // Verify state. + expectSlices, err := listSlices(ctx) + framework.ExpectNoError(err) + gomega.Expect(expectSlices.Items).ShouldNot(gomega.BeEmpty()) + framework.Logf("Protobuf size of one slice is %d bytes = %d KB.", expectSlices.Items[0].Size(), expectSlices.Items[0].Size()/1024) + gomega.Expect(expectSlices.Items[0].Size()).Should(gomega.BeNumerically(">=", 600*1024), "ResourceSlice size") + gomega.Expect(expectSlices.Items[0].Size()).Should(gomega.BeNumerically("<", 1024*1024), "ResourceSlice size") + expectStats := resourceslice.Stats{NumCreates: int64(numSlices)} + gomega.Expect(controller.GetStats()).Should(gomega.Equal(expectStats)) + + // No further changes expected now, after after checking again. + gomega.Consistently(ctx, controller.GetStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats)) + + // Ask the controller to delete all slices except for one empty slice. + ginkgo.By("Deleting slices") + resources = resources.DeepCopy() + resources.Pools[poolName] = resourceslice.Pool{Slices: []resourceslice.Slice{{}}} + controller.Update(resources) + + // One empty slice should remain, after removing the full ones and adding the empty one. + emptySlice := gomega.HaveField("Spec.Devices", gomega.BeEmpty()) + gomega.Eventually(ctx, listSlices).WithTimeout(time.Minute).Should(gomega.HaveField("Items", gomega.ConsistOf(emptySlice))) + expectStats = resourceslice.Stats{NumCreates: int64(numSlices) + 1, NumDeletes: int64(numSlices)} + gomega.Consistently(ctx, controller.GetStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats)) + }) + }) + ginkgo.Context("cluster", func() { nodes := NewNodes(f, 1, 1) driver := NewDriver(f, nodes, networkResources)