From 26650371cc5b44c53e62865a9bd758e4558dcc03 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 22 Oct 2024 22:26:03 +0200 Subject: [PATCH 1/8] DRA resourceslice controller: support publishing multiple slices The driver determines what each slice is meant to look like. The controller then ensures that only those slices exist. It reuses existing slices where the set of devices, as identified by their names, is the same as in some desired slice. Such slices get updated to match the desired state. In other words, attributes and the order of devices can be changed by updating an existing slice, but adding or removing a device is done by deleting and re-creating slices. Co-authored-by: googs1025 The test update is partly based on https://github.com/kubernetes/kubernetes/pull/127645. --- .../internal/workqueue/mockqueue.go | 247 +++++ .../kubeletplugin/draplugin.go | 12 +- .../resourceslice/resourceslicecontroller.go | 352 ++++--- .../resourceslicecontroller_test.go | 863 ++++++++++++++---- .../resourceslice/zz_generated.deepcopy.go | 29 +- 5 files changed, 1228 insertions(+), 275 deletions(-) create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/internal/workqueue/mockqueue.go diff --git a/staging/src/k8s.io/dynamic-resource-allocation/internal/workqueue/mockqueue.go b/staging/src/k8s.io/dynamic-resource-allocation/internal/workqueue/mockqueue.go new file mode 100644 index 00000000000..bf6b6f15b69 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/internal/workqueue/mockqueue.go @@ -0,0 +1,247 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "maps" + "slices" + "sync" + "time" + + "k8s.io/client-go/util/workqueue" +) + +// TODO (pohly): move this to k8s.io/client-go/util/workqueue/mockqueue.go +// if it turns out to be generally useful. Doc comments are already written +// as if the code was there. + +// MockQueue is an implementation of [TypedRateLimitingInterface] which +// can be used to test a function which pulls work items out of a queue +// and processes them. It is thread-safe. +// +// A null instance is directly usable. The usual usage is: +// +// var m workqueue.Mock[string] +// m.SyncOne("some-item", func(queue workqueue.TypedRateLimitingInterface[string]) { ... } ) +// if diff := cmp.Diff(workqueue.MockState[string]{}, m.State()); diff != "" { +// t.Errorf("unexpected state of mock work queue after sync (-want, +got):\n%s", diff) +// } +// +// All slices get reset to nil when they become empty, so there are no spurious +// differences because of nil vs. empty slice. +type Mock[T comparable] struct { + mutex sync.Mutex + state MockState[T] +} + +type MockState[T comparable] struct { + // Ready contains the items which are ready for processing. + Ready []T + + // InFlight contains the items which are currently being processed (= Get + // was called, Done not yet). + InFlight []T + + // MismatchedDone contains the items for which Done was called without + // a matching Get. + MismatchedDone []T + + // Later contains the items which are meant to be added to the queue after + // a certain delay (= AddAfter was called for them). They appear in the + // order in which AddAfter got called. + Later []MockDelayedItem[T] + + // Failures contains the items and their retry count which failed to be + // processed (AddRateLimited called at least once, Forget not yet). + // The retry count is always larger than zero. + Failures map[T]int + + // ShutDownCalled tracks how often ShutDown got called. + ShutDownCalled int + + // ShutDownWithDrainCalled tracks how often ShutDownWithDrain got called. + ShutDownWithDrainCalled int +} + +// DeepCopy takes a snapshot of all slices. It cannot do a deep copy of the items in those slices, +// but typically those keys are immutable. +func (m MockState[T]) DeepCopy() *MockState[T] { + m.Ready = slices.Clone(m.Ready) + m.InFlight = slices.Clone(m.InFlight) + m.MismatchedDone = slices.Clone(m.MismatchedDone) + m.Later = slices.Clone(m.Later) + m.Failures = maps.Clone(m.Failures) + return &m +} + +// MockDelayedItem is an item which was queue for later processing. +type MockDelayedItem[T comparable] struct { + Item T + Duration time.Duration +} + +// SyncOne adds the item to the work queue and calls sync. +// That sync function can pull one or more items from the work +// queue until the queue is empty. Then it is told that the queue +// is shutting down, which must cause it to return. +// +// The test can then retrieve the state of the queue to check the result. +func (m *Mock[T]) SyncOne(item T, sync func(workqueue.TypedRateLimitingInterface[T])) { + // sync must run with the mutex not locked. + defer sync(m) + m.mutex.Lock() + defer m.mutex.Unlock() + + m.state.Ready = append(m.state.Ready, item) +} + +// State returns the current state of the queue. +func (m *Mock[T]) State() MockState[T] { + m.mutex.Lock() + defer m.mutex.Unlock() + + return *m.state.DeepCopy() +} + +// Add implements [TypedInterface]. +func (m *Mock[T]) Add(item T) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if !slices.Contains(m.state.Ready, item) { + m.state.Ready = append(m.state.Ready, item) + } +} + +// Len implements [TypedInterface]. +func (m *Mock[T]) Len() int { + m.mutex.Lock() + defer m.mutex.Unlock() + + return len(m.state.Ready) +} + +// Get implements [TypedInterface]. +func (m *Mock[T]) Get() (item T, shutdown bool) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if len(m.state.Ready) == 0 { + shutdown = true + return + } + item = m.state.Ready[0] + m.state.Ready = m.state.Ready[1:] + if len(m.state.Ready) == 0 { + m.state.Ready = nil + } + m.state.InFlight = append(m.state.InFlight, item) + return item, false +} + +// Done implements [TypedInterface]. +func (m *Mock[T]) Done(item T) { + m.mutex.Lock() + defer m.mutex.Unlock() + + index := slices.Index(m.state.InFlight, item) + if index < 0 { + m.state.MismatchedDone = append(m.state.MismatchedDone, item) + } + m.state.InFlight = slices.Delete(m.state.InFlight, index, index+1) + if len(m.state.InFlight) == 0 { + m.state.InFlight = nil + } +} + +// ShutDown implements [TypedInterface]. +func (m *Mock[T]) ShutDown() { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.state.ShutDownCalled++ +} + +// ShutDownWithDrain implements [TypedInterface]. +func (m *Mock[T]) ShutDownWithDrain() { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.state.ShutDownWithDrainCalled++ +} + +// ShuttingDown implements [TypedInterface]. +func (m *Mock[T]) ShuttingDown() bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.state.ShutDownCalled > 0 || m.state.ShutDownWithDrainCalled > 0 +} + +// AddAfter implements [TypedDelayingInterface.AddAfter] +func (m *Mock[T]) AddAfter(item T, duration time.Duration) { + if duration == 0 { + m.Add(item) + return + } + + m.mutex.Lock() + defer m.mutex.Unlock() + + for i := range m.state.Later { + if m.state.Later[i].Item == item { + // https://github.com/kubernetes/client-go/blob/270e5ab1714527c455865953da8ceba2810dbb50/util/workqueue/delaying_queue.go#L340-L349 + // only shortens the delay for an existing item. It does not make it longer. + if m.state.Later[i].Duration > duration { + m.state.Later[i].Duration = duration + } + return + } + } + + m.state.Later = append(m.state.Later, MockDelayedItem[T]{Item: item, Duration: duration}) +} + +// AddRateLimited implements [TypedRateLimitingInterface.AddRateLimited]. +func (m *Mock[T]) AddRateLimited(item T) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.state.Failures == nil { + m.state.Failures = make(map[T]int) + } + m.state.Failures[item]++ +} + +// Forget implements [TypedRateLimitingInterface.Forget]. +func (m *Mock[T]) Forget(item T) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.state.Failures == nil { + return + } + delete(m.state.Failures, item) +} + +// NumRequeues implements [TypedRateLimitingInterface.NumRequeues]. +func (m *Mock[T]) NumRequeues(item T) int { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.state.Failures[item] +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 37dc11ea677..3adbf6b2318 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -393,7 +393,9 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e driverResources := &resourceslice.DriverResources{ Pools: map[string]resourceslice.Pool{ d.nodeName: { - Devices: resources.Devices, + Slices: []resourceslice.Slice{{ + Devices: resources.Devices, + }}, }, }, } @@ -407,7 +409,13 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") controllerCtx = klog.NewContext(controllerCtx, controllerLogger) var err error - if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil { + if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, + resourceslice.Options{ + DriverName: d.driverName, + KubeClient: d.kubeClient, + Owner: &owner, + Resources: driverResources, + }); err != nil { return fmt.Errorf("start ResourceSlice controller: %w", err) } return nil diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 0f869181197..610eb9d2798 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -20,19 +20,20 @@ import ( "context" "errors" "fmt" - "sort" "sync" "time" "github.com/google/go-cmp/cmp" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" + apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" resourceinformers "k8s.io/client-go/informers/resource/v1alpha3" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -57,8 +58,8 @@ const ( // controller as part of its kubelet plugin. type Controller struct { cancel func(cause error) - driver string - owner Owner + driverName string + owner *Owner kubeClient kubernetes.Interface wg sync.WaitGroup // The queue is keyed with the pool name that needs work. @@ -94,7 +95,28 @@ type Pool struct { // by the controller. Generation int64 - // Device names must be unique inside the pool. + // Slices is a list of all ResourceSlices that the driver + // wants to publish for this pool. The driver must ensure + // that each resulting slice is valid. See the API + // definition for details, in particular the limit on + // the number of devices. + // + // If slices are not valid, then the controller will + // log errors produced by the apiserver. + // + // Drivers should publish at least one slice for each + // pool that they normally manage, even if that slice + // is empty. "Empty pool" is different from "no pool" + // because it shows that the driver is up-and-running + // and simply doesn't have any devices. + Slices []Slice +} + +// +k8s:deepcopy-gen=true + +// Slice is turned into one ResourceSlice by the controller. +type Slice struct { + // Devices lists all devices which are part of the slice. Devices []resourceapi.Device } @@ -110,19 +132,9 @@ type Owner struct { } // StartController constructs a new controller and starts it. -// If the owner is a v1.Node, then the NodeName field in the -// ResourceSlice objects is set and used to identify objects -// managed by the controller. The UID is not needed in that -// case, the controller will determine it automatically. -// -// If a kubeClient is provided, then it synchronizes ResourceSlices -// with the resource information provided by plugins. Without it, -// the controller is inactive. This can happen when kubelet is run stand-alone -// without an apiserver. In that case we can't and don't need to publish -// ResourceSlices. -func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) { +func StartController(ctx context.Context, options Options) (*Controller, error) { logger := klog.FromContext(ctx) - c, err := newController(ctx, kubeClient, driver, owner, resources) + c, err := newController(ctx, options) if err != nil { return nil, fmt.Errorf("create controller: %w", err) } @@ -134,15 +146,38 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive defer logger.V(3).Info("Stopping") c.run(ctx) }() - - // Sync each pool once. - for poolName := range resources.Pools { - c.queue.Add(poolName) - } - return c, nil } +// Options contains various optional settings for [StartController]. +type Options struct { + // DriverName is the required name of the DRA driver. + DriverName string + + // KubeClient is used to read Node objects (if necessary) and to access + // ResourceSlices. It must be specified. + KubeClient kubernetes.Interface + + // If the owner is a v1.Node, then the NodeName field in the + // ResourceSlice objects is set and used to identify objects + // managed by the controller. The UID is not needed in that + // case, the controller will determine it automatically. + // + // The owner must be cluster-scoped. This is not always possible, + // therefore it is optional. A driver without a owner must take + // care that remaining slices get deleted manually as part of + // a driver uninstall because garbage collection won't work. + Owner *Owner + + // This is the initial desired set of slices. As with + // [Controller.Update], the controller takes ownership of the resources + // instance. The content must not get modified by the caller. + Resources *DriverResources + + // Queue can be used to override the default work queue implementation. + Queue workqueue.TypedRateLimitingInterface[string] +} + // Stop cancels all background activity and blocks until the controller has stopped. func (c *Controller) Stop() { if c == nil { @@ -155,7 +190,8 @@ func (c *Controller) Stop() { // Update sets the new desired state of the resource information. // // The controller takes over ownership, so these resources must -// not get modified after this method returns. +// not get modified after this method returns. [DriverResources.DeepCopy] +// can be used by the caller to clone some existing instance. func (c *Controller) Update(resources *DriverResources) { c.mutex.Lock() defer c.mutex.Unlock() @@ -174,28 +210,43 @@ func (c *Controller) Update(resources *DriverResources) { } // newController creates a new controller. -func newController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) { - if kubeClient == nil { - return nil, fmt.Errorf("kubeClient is nil") +func newController(ctx context.Context, options Options) (*Controller, error) { + if options.KubeClient == nil { + return nil, errors.New("KubeClient is nil") + } + if options.DriverName == "" { + return nil, errors.New("DRA driver name is empty") + } + if options.Resources == nil { + return nil, errors.New("DriverResources are nil") } ctx, cancel := context.WithCancelCause(ctx) c := &Controller{ cancel: cancel, - kubeClient: kubeClient, - driver: driver, - owner: owner, - queue: workqueue.NewTypedRateLimitingQueueWithConfig( + kubeClient: options.KubeClient, + driverName: options.DriverName, + owner: options.Owner.DeepCopy(), + queue: options.Queue, + resources: options.Resources, + } + if c.queue == nil { + c.queue = workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"}, - ), - resources: resources, + ) } if err := c.initInformer(ctx); err != nil { return nil, err } + + // Sync each desired pool once. + for poolName := range options.Resources.Pools { + c.queue.Add(poolName) + } + return c, nil } @@ -205,10 +256,10 @@ func (c *Controller) initInformer(ctx context.Context) error { // We always filter by driver name, by node name only for node-local resources. selector := fields.Set{ - resourceapi.ResourceSliceSelectorDriver: c.driver, + resourceapi.ResourceSliceSelectorDriver: c.driverName, resourceapi.ResourceSliceSelectorNodeName: "", } - if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { + if c.owner != nil && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name } informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, cache.Indexers{ @@ -348,7 +399,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // The result gets cached and is expected to not change while // the controller runs. var nodeName string - if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { + if c.owner != nil && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { nodeName = c.owner.Name if c.owner.UID == "" { node, err := c.kubeClient.CoreV1().Nodes().Get(ctx, c.owner.Name, metav1.GetOptions{}) @@ -360,8 +411,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { } } - // Slices that don't match any driver resource can either be updated (if there - // are new driver resources that need to be stored) or they need to be deleted. + // Slices that don't match any driver slice need to be deleted. obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices)) // Determine highest generation. @@ -381,66 +431,162 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { currentSlices = append(currentSlices, slice) } } - slices = currentSlices - - // Sort by name to ensure that keeping only the first slice is deterministic. - sort.Slice(slices, func(i, j int) bool { - return slices[i].Name < slices[j].Name - }) + logger.V(5).Info("Existing slices", "obsolete", klog.KObjSlice(obsoleteSlices), "current", klog.KObjSlice(currentSlices)) if pool, ok := resources.Pools[poolName]; ok { - if pool.Generation > generation { - generation = pool.Generation - } - - // Right now all devices get published in a single slice. - // We simply pick the first one, if there is one, and copy - // it in preparation for updating it. + // Match each existing slice against the desired slices. + // Two slices match if they contain exactly the same + // device IDs, in an arbitrary order. Such a matched + // slice gets updated with the desired content if + // there is a difference. // - // TODO: support splitting across slices, with unit tests. - if len(slices) > 0 { - obsoleteSlices = append(obsoleteSlices, slices[1:]...) - slices = []*resourceapi.ResourceSlice{slices[0].DeepCopy()} - } else { - slices = []*resourceapi.ResourceSlice{ - { - ObjectMeta: metav1.ObjectMeta{ - GenerateName: c.owner.Name + "-" + c.driver + "-", - }, - }, + // This supports updating the definition of devices + // in a slice. Adding or removing devices is done + // by deleting the old slice and creating a new one. + // + // This is primarily a simplification of the code: + // to support adding or removing devices from + // existing slices, we would have to identify "most + // similar" slices (= minimal editing distance). + // + // In currentSliceForDesiredSlice we keep track of + // which desired slice has a matched slice. + // + // At the end of the loop, each current slice is either + // a match or obsolete. + currentSliceForDesiredSlice := make(map[int]*resourceapi.ResourceSlice, len(pool.Slices)) + for _, currentSlice := range currentSlices { + matched := false + for i := range pool.Slices { + if _, ok := currentSliceForDesiredSlice[i]; ok { + // Already has a match. + continue + } + if sameSlice(currentSlice, &pool.Slices[i]) { + currentSliceForDesiredSlice[i] = currentSlice + logger.V(5).Info("Matched existing slice", "slice", klog.KObj(currentSlice), "matchIndex", i) + matched = true + break + } + } + if !matched { + obsoleteSlices = append(obsoleteSlices, currentSlice) + logger.V(5).Info("Unmatched existing slice", "slice", klog.KObj(currentSlice)) } } - slice := slices[0] - slice.OwnerReferences = []metav1.OwnerReference{{ - APIVersion: c.owner.APIVersion, - Kind: c.owner.Kind, - Name: c.owner.Name, - UID: c.owner.UID, - Controller: ptr.To(true), - }} - slice.Spec.Driver = c.driver - slice.Spec.Pool.Name = poolName - slice.Spec.Pool.Generation = generation - slice.Spec.Pool.ResourceSliceCount = 1 - slice.Spec.NodeName = nodeName - slice.Spec.NodeSelector = pool.NodeSelector - slice.Spec.AllNodes = pool.NodeSelector == nil && nodeName == "" - slice.Spec.Devices = pool.Devices + // Desired metadata which must be set in each slice. + resourceSliceCount := len(pool.Slices) + numMatchedSlices := len(currentSliceForDesiredSlice) + numNewSlices := resourceSliceCount - numMatchedSlices + desiredPool := resourceapi.ResourcePool{ + Name: poolName, + Generation: generation, // May get updated later. + ResourceSliceCount: int64(resourceSliceCount), + } + desiredAllNodes := pool.NodeSelector == nil && nodeName == "" - if loggerV := logger.V(6); loggerV.Enabled() { - // Dump entire resource information. - loggerV.Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "pool", pool) - } else { - logger.V(5).Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices)) + // Now for each desired slice, figure out which of them are changed. + changedDesiredSlices := sets.New[int]() + for i, currentSlice := range currentSliceForDesiredSlice { + // Reordering entries is a difference and causes an update even if the + // entries are the same. + if !apiequality.Semantic.DeepEqual(¤tSlice.Spec.Pool, &desiredPool) || + !apiequality.Semantic.DeepEqual(currentSlice.Spec.NodeSelector, pool.NodeSelector) || + currentSlice.Spec.AllNodes != desiredAllNodes || + !apiequality.Semantic.DeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) { + changedDesiredSlices.Insert(i) + logger.V(5).Info("Need to update slice", "slice", klog.KObj(currentSlice), "matchIndex", i) + } + } + logger.V(5).Info("Completed comparison", + "numObsolete", len(obsoleteSlices), + "numMatchedSlices", len(currentSliceForDesiredSlice), + "numChangedMatchedSlices", len(changedDesiredSlices), + "numNewSlices", numNewSlices, + ) + + bumpedGeneration := false + switch { + case pool.Generation > generation: + // Bump up the generation if the driver asked for it, or + // start with a non-zero generation. + generation = pool.Generation + bumpedGeneration = true + logger.V(5).Info("Bumped generation to driver-provided generation", "generation", generation) + case numNewSlices == 0 && len(changedDesiredSlices) <= 1: + logger.V(5).Info("Kept generation because at most one update API call is necessary", "generation", generation) + default: + generation++ + bumpedGeneration = true + logger.V(5).Info("Bumped generation by one", "generation", generation) + } + desiredPool.Generation = generation + + // Update existing slices. + for i, currentSlice := range currentSliceForDesiredSlice { + if !changedDesiredSlices.Has(i) && !bumpedGeneration { + continue + } + slice := currentSlice.DeepCopy() + slice.Spec.Pool = desiredPool + // No need to set the node name. If it was different, we wouldn't + // have listed the existing slice. + slice.Spec.NodeSelector = pool.NodeSelector + slice.Spec.AllNodes = desiredAllNodes + slice.Spec.Devices = pool.Slices[i].Devices + + logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update resource slice: %w", err) + } + } + + // Create new slices. + for i := 0; i < len(pool.Slices); i++ { + if _, ok := currentSliceForDesiredSlice[i]; ok { + // Was handled above through an update. + continue + } + var ownerReferences []metav1.OwnerReference + if c.owner != nil { + ownerReferences = append(ownerReferences, + metav1.OwnerReference{ + APIVersion: c.owner.APIVersion, + Kind: c.owner.Kind, + Name: c.owner.Name, + UID: c.owner.UID, + Controller: ptr.To(true), + }, + ) + } + generateName := c.driverName + "-" + if c.owner != nil { + generateName = c.owner.Name + "-" + generateName + } + slice := &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences, + GenerateName: generateName, + }, + Spec: resourceapi.ResourceSliceSpec{ + Driver: c.driverName, + Pool: desiredPool, + NodeName: nodeName, + NodeSelector: pool.NodeSelector, + AllNodes: desiredAllNodes, + Devices: pool.Slices[i].Devices, + }, + } + logger.V(5).Info("Creating new resource slice") + if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create resource slice: %w", err) + } } } else if len(slices) > 0 { // All are obsolete, pool does not exist anymore. - - logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices)) - obsoleteSlices = append(obsoleteSlices, slices...) - // No need to create or update the slices. - slices = nil + obsoleteSlices = slices + logger.V(5).Info("Removing resource slices after pool removal") } // Remove stale slices. @@ -451,22 +597,24 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { } } - // Create or update slices. - for _, slice := range slices { - if slice.UID == "" { - logger.V(5).Info("Creating new resource slice", "slice", klog.KObj(slice)) - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("create resource slice: %w", err) - } - continue - } + return nil +} - // TODO: switch to SSA once unit testing supports it. - logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("update resource slice: %w", err) +func sameSlice(existingSlice *resourceapi.ResourceSlice, desiredSlice *Slice) bool { + if len(existingSlice.Spec.Devices) != len(desiredSlice.Devices) { + return false + } + + existingDevices := sets.New[string]() + for _, device := range existingSlice.Spec.Devices { + existingDevices.Insert(device.Name) + } + for _, device := range desiredSlice.Devices { + if !existingDevices.Has(device.Name) { + return false } } - return nil + // Same number of devices, names all present -> equal. + return true } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go index 1ba33da54c5..ef7c7f9ed32 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go @@ -17,6 +17,10 @@ limitations under the License. package resourceslice import ( + "fmt" + "sort" + "strconv" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -28,243 +32,766 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/dynamic-resource-allocation/internal/workqueue" + "k8s.io/klog/v2" "k8s.io/klog/v2/ktesting" "k8s.io/utils/ptr" ) +func init() { + klog.InitFlags(nil) +} + +// TestControllerSyncPool verifies that syncPool produces the right ResourceSlices. +// Update vs. Create API calls are checked by bumping the ResourceVersion in +// updates. func TestControllerSyncPool(t *testing.T) { var ( - node = "node" + ownerName = "owner" nodeUID = types.UID("node-uid") - driveName = "driver" + driverName = "driver" poolName = "pool" - poolName1 = "pool-1" deviceName = "device" + deviceName1 = "device-1" + deviceName2 = "device-2" + deviceName3 = "device-3" + deviceName4 = "device-4" resourceSlice1 = "resource-slice-1" resourceSlice2 = "resource-slice-2" resourceSlice3 = "resource-slice-3" + generateName = ownerName + "-" + driverName + "-" + generatedName1 = generateName + "0" + basicDevice = &resourceapi.BasicDevice{ + Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "new-attribute": {StringValue: ptr.To("value")}, + }, + } + nodeSelector = &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{ + MatchFields: []v1.NodeSelectorRequirement{{ + Key: "name", + Values: []string{"node-a"}, + }}, + }}, + } + otherNodeSelector = &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{ + MatchFields: []v1.NodeSelectorRequirement{{ + Key: "name", + Values: []string{"node-b"}, + }}, + }}, + } ) testCases := map[string]struct { - nodeUID types.UID + // nodeUID is empty if not a node-local. + nodeUID types.UID + // noOwner completely disables setting an owner. + noOwner bool + // initialObjects is a list of initial resource slices to be used in the test. initialObjects []runtime.Object - poolName string + initialOtherObjects []runtime.Object inputDriverResources *DriverResources expectedResourceSlices []resourceapi.ResourceSlice }{ - "single-resourceslice-existing": { - nodeUID: nodeUID, - initialObjects: []runtime.Object{ - newResourceSlice(resourceSlice1, node, driveName, poolName, 1), - }, - poolName: poolName, - inputDriverResources: &DriverResources{ - Pools: map[string]Pool{ - poolName: { - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, - }, - }, - }, - expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), - }, - }, - "single-resourceslice-existing-with-nodeUID-empty": { - nodeUID: "", - initialObjects: []runtime.Object{ - newNode(node, nodeUID), - newResourceSlice(resourceSlice1, node, driveName, poolName, 1), - }, - poolName: poolName, - inputDriverResources: &DriverResources{ - Pools: map[string]Pool{ - poolName: { - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, - }, - }, - }, - expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), - }, - }, - "multiple-resourceslices-existing": { - nodeUID: nodeUID, - initialObjects: []runtime.Object{ - newResourceSlice(resourceSlice1, node, driveName, poolName, 1), - newResourceSlice(resourceSlice2, node, driveName, poolName, 1), - newResourceSlice(resourceSlice3, node, driveName, poolName, 1), - }, - poolName: poolName, - inputDriverResources: &DriverResources{ - Pools: map[string]Pool{ - poolName: { - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, - }, - }, - }, - expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), - }, - }, - "no-resourceslices-existing": { + "create-slice": { nodeUID: nodeUID, initialObjects: []runtime.Object{}, - poolName: poolName, inputDriverResources: &DriverResources{ Pools: map[string]Pool{ poolName: { - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, + Slices: []Slice{{Devices: []resourceapi.Device{}}}, }, }, }, expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(node+"-"+driveName+"-", node, string(nodeUID), driveName, poolName, deviceName, true, 0), + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), }, }, - "single-resourceslice-existing-with-different-resource-pool-generation": { + "keep-slice-unchanged": { nodeUID: nodeUID, initialObjects: []runtime.Object{ - newResourceSlice(resourceSlice1, node, driveName, poolName, 2), - newResourceSlice(resourceSlice2, node, driveName, poolName, 1), - newResourceSlice(resourceSlice3, node, driveName, poolName, 1), + MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), }, - poolName: poolName, inputDriverResources: &DriverResources{ Pools: map[string]Pool{ poolName: { - Generation: 3, - Devices: []resourceapi.Device{ - { - Name: deviceName, - }, - }, - }, - }, - }, - expectedResourceSlices: []resourceapi.ResourceSlice{ - *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 3), - }, - }, - "single-resourceslice-existing-with-mismatching-resource-pool-name": { - nodeUID: nodeUID, - initialObjects: []runtime.Object{ - newResourceSlice(resourceSlice1, node, driveName, poolName, 1), - }, - poolName: poolName, - inputDriverResources: &DriverResources{ - Pools: map[string]Pool{ - poolName1: { Generation: 1, - Devices: []resourceapi.Device{ - { - Name: deviceName, + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "remove-pool": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{}, + expectedResourceSlices: nil, + }, + "delete-and-add-slice": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + // no devices + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}}, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 1}).Obj(), + }, + }, + "delete-redundant-slice": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}}, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "update-slice": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{ + Devices: []resourceapi.Device{{ + Name: deviceName, + Basic: basicDevice, + }}, + }}, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{ + Name: deviceName, + Basic: basicDevice, + }}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "update-slice-many-devices": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}, {Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{ + Devices: []resourceapi.Device{ + {Name: deviceName1}, + { + Name: deviceName2, + Basic: basicDevice, + }, }, + }}, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{ + {Name: deviceName1}, + { + Name: deviceName2, + Basic: basicDevice, + }}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "multiple-resourceslices-existing-no-changes": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2}}}, + {Devices: []resourceapi.Device{{Name: deviceName3}}}, }, }, }, }, - expectedResourceSlices: nil, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + }, + "multiple-resourceslices-existing-with-different-resource-pool-generation": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + // no devices + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + // matching device + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 1}).Obj(), + // no devices + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{ + Devices: []resourceapi.Device{ + { + Name: deviceName, + }, + }, + }}, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 1}).Obj(), + }, + }, + "multiple-resourceslices-existing-one-changed": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2, Basic: basicDevice}}}, + {Devices: []resourceapi.Device{{Name: deviceName3}}}, + }, + }, + }, + }, + // Generation not bumped, only one update. + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2, Basic: basicDevice}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + }, + "multiple-resourceslices-existing-two-changed": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2, Basic: basicDevice}}}, + {Devices: []resourceapi.Device{{Name: deviceName3, Basic: basicDevice}}}, + }, + }, + }, + }, + // Generation bumped, all updated. + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).ResourceVersion("1"). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2, Basic: basicDevice}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 3}).Obj(), + *MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).ResourceVersion("1"). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3, Basic: basicDevice}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 3}).Obj(), + }, + }, + "multiple-resourceslices-existing-one-removed": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2}}}, + }, + }, + }, + }, + // Generation bumped, two updated, one removed. + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).ResourceVersion("1"). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 2}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 2}).Obj(), + }, + }, + "multiple-resourceslices-existing-one-added": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 3}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{ + {Devices: []resourceapi.Device{{Name: deviceName1}}}, + {Devices: []resourceapi.Device{{Name: deviceName2}}}, + {Devices: []resourceapi.Device{{Name: deviceName3}}}, + {Devices: []resourceapi.Device{{Name: deviceName4}}}, + }, + }, + }, + }, + // Three updated, one generated. + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName1}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 4}).Obj(), + *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName2}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 4}).Obj(), + *MakeResourceSlice().Name(resourceSlice3).UID(resourceSlice3).ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName3}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 4}).Obj(), + *MakeResourceSlice().GenerateName(generateName).Name(generatedName1). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName4}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 2, ResourceSliceCount: 4}).Obj(), + }, + }, + "add-one-network-device-all-nodes": { + initialObjects: []runtime.Object{}, + noOwner: true, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(driverName + "-0").GenerateName(driverName + "-"). + AllNodes(true). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "add-one-network-device-some-nodes": { + initialObjects: []runtime.Object{}, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + NodeSelector: nodeSelector, + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + AppOwnerReferences(ownerName).NodeSelector(nodeSelector). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + }, + "update-node-selector": { + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). + AppOwnerReferences(ownerName).NodeSelector(nodeSelector). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + NodeSelector: otherNodeSelector, + Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). + AppOwnerReferences(ownerName).NodeSelector(otherNodeSelector). + Driver(driverName).Devices([]resourceapi.Device{{Name: deviceName}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), + }, }, } for name, test := range testCases { t.Run(name, func(t *testing.T) { _, ctx := ktesting.NewTestContext(t) - kubeClient := fake.NewSimpleClientset(test.initialObjects...) - owner := Owner{ + inputObjects := make([]runtime.Object, 0, len(test.initialObjects)+len(test.initialOtherObjects)) + for _, initialOtherObject := range test.initialOtherObjects { + inputObjects = append(inputObjects, initialOtherObject.DeepCopyObject()) + } + for _, initialObject := range test.initialObjects { + if _, ok := initialObject.(*resourceapi.ResourceSlice); !ok { + t.Fatalf("test.initialObjects have to be of type *resourceapi.ResourceSlice") + } + inputObjects = append(inputObjects, initialObject.DeepCopyObject()) + } + kubeClient := createTestClient(inputObjects...) + var queue workqueue.Mock[string] + owner := &Owner{ APIVersion: "v1", Kind: "Node", - Name: node, + Name: ownerName, UID: test.nodeUID, } - ctrl, err := newController(ctx, kubeClient, driveName, owner, test.inputDriverResources) + if test.nodeUID == "" { + owner = &Owner{ + APIVersion: "apps/v1", + Kind: "Something", + Name: ownerName, + } + } + if test.noOwner { + owner = nil + } + ctrl, err := newController(ctx, Options{ + DriverName: driverName, + KubeClient: kubeClient, + Owner: owner, + Resources: test.inputDriverResources, + Queue: &queue, + }) defer ctrl.Stop() require.NoError(t, err, "unexpected controller creation error") - err = ctrl.syncPool(ctx, test.poolName) - require.NoError(t, err, "unexpected syncPool error") + + // Process work items in the queue until the queue is empty. + // Processing races with informers adding new work items, + // but the desired state should already be reached in the + // first iteration, so all following iterations should be nops. + ctrl.run(ctx) // Check ResourceSlices resourceSlices, err := kubeClient.ResourceV1alpha3().ResourceSlices().List(ctx, metav1.ListOptions{}) require.NoError(t, err, "list resource slices") - assert.Equal(t, test.expectedResourceSlices, resourceSlices.Items, "unexpected ResourceSlices") + + sortResourceSlices(test.expectedResourceSlices) + sortResourceSlices(resourceSlices.Items) + assert.Equal(t, test.expectedResourceSlices, resourceSlices.Items) + + // The informer might have added a work item after ctrl.run returned. + state := queue.State() + state.Ready = nil + assert.Equal(t, workqueue.MockState[string]{}, state) }) } } -func newNode(name string, uid types.UID) *v1.Node { - return &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: uid, - }, +func sortResourceSlices(slices []resourceapi.ResourceSlice) { + sort.Slice(slices, func(i, j int) bool { + if len(slices[i].Name) == 0 && len(slices[j].Name) == 0 { + return slices[i].ObjectMeta.GenerateName < slices[j].ObjectMeta.GenerateName + } + return slices[i].Name < slices[j].Name + }) +} + +func createTestClient(objects ...runtime.Object) *fake.Clientset { + fakeClient := fake.NewSimpleClientset(objects...) + fakeClient.PrependReactor("create", "resourceslices", createResourceSliceCreateReactor()) + fakeClient.PrependReactor("update", "resourceslices", resourceSliceUpdateReactor) + return fakeClient +} + +// createResourceSliceCreateReactor returns a function which +// implements the logic required for the GenerateName field to work when using +// the fake client. Add it with client.PrependReactor to your fake client. +func createResourceSliceCreateReactor() func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + nameCounter := 0 + var mutex sync.Mutex + return func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + mutex.Lock() + defer mutex.Unlock() + resourceslice := action.(k8stesting.CreateAction).GetObject().(*resourceapi.ResourceSlice) + if resourceslice.Name == "" && resourceslice.GenerateName != "" { + resourceslice.Name = fmt.Sprintf("%s%d", resourceslice.GenerateName, nameCounter) + } + nameCounter++ + return false, nil, nil } } -func newResourceSlice(name, nodeName, driveName, poolName string, poolGeneration int64) *resourceapi.ResourceSlice { - return &resourceapi.ResourceSlice{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(name), - }, - Spec: resourceapi.ResourceSliceSpec{ - NodeName: nodeName, - Driver: driveName, - Pool: resourceapi.ResourcePool{ - Name: poolName, - ResourceSliceCount: 1, - Generation: poolGeneration, +// resourceSliceUpdateReactor implements the ResourceVersion bump for a fake client. +func resourceSliceUpdateReactor(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + resourceslice := action.(k8stesting.UpdateAction).GetObject().(*resourceapi.ResourceSlice) + rev := 0 + if resourceslice.ResourceVersion != "" { + oldRev, err := strconv.Atoi(resourceslice.ResourceVersion) + if err != nil { + return false, nil, fmt.Errorf("ResourceVersion %q should have been an int: %w", resourceslice.ResourceVersion, err) + } + rev = oldRev + } + rev++ + resourceslice.ResourceVersion = fmt.Sprintf("%d", rev) + return false, nil, nil +} + +// ResourceSliceWrapper wraps a ResourceSlice. +type ResourceSliceWrapper struct { + resourceapi.ResourceSlice +} + +// MakeResourceSlice creates a wrapper for a ResourceSlice. +func MakeResourceSlice() *ResourceSliceWrapper { + return &ResourceSliceWrapper{ + resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{}, + Spec: resourceapi.ResourceSliceSpec{ + Pool: resourceapi.ResourcePool{}, + Devices: []resourceapi.Device{}, }, }, } } -func newExpectResourceSlice(name, nodeName, nodeUID, driveName, poolName, deviceName string, generateName bool, poolGeneration int64) *resourceapi.ResourceSlice { - resourceSlice := &resourceapi.ResourceSlice{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "Node", - Name: nodeName, - UID: types.UID(nodeUID), - Controller: ptr.To(true), - }, - }, - }, - Spec: resourceapi.ResourceSliceSpec{ - NodeName: nodeName, - Driver: driveName, - Pool: resourceapi.ResourcePool{ - Name: poolName, - ResourceSliceCount: 1, - Generation: poolGeneration, - }, - Devices: []resourceapi.Device{{Name: deviceName}}, - }, - } - - if generateName { - resourceSlice.ObjectMeta.GenerateName = name - } else { - resourceSlice.ObjectMeta.Name = name - resourceSlice.ObjectMeta.UID = types.UID(name) - } - return resourceSlice +// Obj returns the inner ResourceSlice. +func (r *ResourceSliceWrapper) Obj() *resourceapi.ResourceSlice { + return &r.ResourceSlice +} + +// Name sets the value of ResourceSlice.ObjectMeta.Name +func (r *ResourceSliceWrapper) Name(name string) *ResourceSliceWrapper { + r.ObjectMeta.Name = name + return r +} + +// GenerateName sets the value of ResourceSlice.ObjectMeta.GenerateName +func (r *ResourceSliceWrapper) GenerateName(generateName string) *ResourceSliceWrapper { + r.ObjectMeta.GenerateName = generateName + return r +} + +// UID sets the value of ResourceSlice.ObjectMeta.UID +func (r *ResourceSliceWrapper) UID(uid string) *ResourceSliceWrapper { + r.ObjectMeta.UID = types.UID(uid) + return r +} + +// ResourceVersion sets the value of ResourceSlice.ObjectMeta.ResourceVersion +func (r *ResourceSliceWrapper) ResourceVersion(rev string) *ResourceSliceWrapper { + r.ObjectMeta.ResourceVersion = rev + return r +} + +// NodeOwnerReferences sets the value of ResourceSlice.ObjectMeta.NodeOwnerReferences +// to a v1.Node +func (r *ResourceSliceWrapper) NodeOwnerReferences(nodeName, nodeUID string) *ResourceSliceWrapper { + r.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Node", + Name: nodeName, + UID: types.UID(nodeUID), + Controller: ptr.To(true), + }, + } + return r +} + +// AppOwnerReferences sets the value of ResourceSlice.ObjectMeta.NodeOwnerReferences +// to some fictional app controller resource +func (r *ResourceSliceWrapper) AppOwnerReferences(appName string) *ResourceSliceWrapper { + r.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Something", + Name: appName, + Controller: ptr.To(true), + }, + } + return r +} + +// Driver sets the value of ResourceSlice.Spec.Driver +func (r *ResourceSliceWrapper) Driver(driver string) *ResourceSliceWrapper { + r.Spec.Driver = driver + return r +} + +// Pool sets the value of ResourceSlice.Spec.Pool +func (r *ResourceSliceWrapper) Pool(pool resourceapi.ResourcePool) *ResourceSliceWrapper { + r.Spec.Pool = pool + return r +} + +// NodeName sets the value of ResourceSlice.Spec.NodeName +func (r *ResourceSliceWrapper) NodeName(nodeName string) *ResourceSliceWrapper { + r.Spec.NodeName = nodeName + return r +} + +// NodeSelector sets the value of ResourceSlice.Spec.NodeSelector +func (r *ResourceSliceWrapper) NodeSelector(nodeSelector *v1.NodeSelector) *ResourceSliceWrapper { + r.Spec.NodeSelector = nodeSelector + return r +} + +// AllNodes sets the value of ResourceSlice.Spec.AllNodes +func (r *ResourceSliceWrapper) AllNodes(allNodes bool) *ResourceSliceWrapper { + r.Spec.AllNodes = allNodes + return r +} + +// Devices sets the value of ResourceSlice.Spec.Devices +func (r *ResourceSliceWrapper) Devices(devices []resourceapi.Device) *ResourceSliceWrapper { + r.Spec.Devices = devices + return r } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/zz_generated.deepcopy.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/zz_generated.deepcopy.go index 82d68f3ae0f..635e89fcc1b 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/zz_generated.deepcopy.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/zz_generated.deepcopy.go @@ -73,9 +73,9 @@ func (in *Pool) DeepCopyInto(out *Pool) { *out = new(v1.NodeSelector) (*in).DeepCopyInto(*out) } - if in.Devices != nil { - in, out := &in.Devices, &out.Devices - *out = make([]v1alpha3.Device, len(*in)) + if in.Slices != nil { + in, out := &in.Slices, &out.Slices + *out = make([]Slice, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -92,3 +92,26 @@ func (in *Pool) DeepCopy() *Pool { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Slice) DeepCopyInto(out *Slice) { + *out = *in + if in.Devices != nil { + in, out := &in.Devices, &out.Devices + *out = make([]v1alpha3.Device, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Slice. +func (in *Slice) DeepCopy() *Slice { + if in == nil { + return nil + } + out := new(Slice) + in.DeepCopyInto(out) + return out +} From a6d180c7d30d7e68dee78d245e87d0d9dd4ff206 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 25 Oct 2024 21:47:10 +0200 Subject: [PATCH 2/8] DRA: validate set of devices in a pool before using the pool The ResourceSlice controller (theoretically) might end up creating too many slices if it syncs again before its informer cache was updated. This could cause the scheduler to allocate a device from a duplicated slice. They should be identical, but its still better to fail and wait until the controller removes the redundant slice. --- .../resourceslice/resourceslicecontroller.go | 14 ++++++++++ .../structured/allocator.go | 10 +++++++ .../structured/allocator_test.go | 22 +++++++++++++++ .../structured/pools.go | 28 +++++++++++++++---- 4 files changed, 69 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 610eb9d2798..1fcd434cf1f 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -578,6 +578,20 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { Devices: pool.Slices[i].Devices, }, } + + // It can happen that we create a missing slice, some + // other change than the create causes another sync of + // the pool, and then a second slice for the same set + // of devices gets created because the controller has + // no copy of the first slice instance in its informer + // cache yet. + // + // This is not a problem: a client will either see one + // of the two slices and use it or see both and do + // nothing because of the duplicated device IDs. + // + // To avoid creating a second slice, we would have to use a + // https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache. logger.V(5).Info("Creating new resource slice") if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { return fmt.Errorf("create resource slice: %w", err) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index c7bdd1f4247..a620634e8c4 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -187,6 +187,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] if pool.IsIncomplete { return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently being updated", klog.KObj(claim), request.Name, pool.PoolID) } + if pool.IsInvalid { + return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently invalid", klog.KObj(claim), request.Name, pool.PoolID) + } for _, slice := range pool.Slices { for deviceIndex := range slice.Spec.Devices { @@ -574,6 +577,13 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) { continue } + // If the pool is not valid, then fail now. It's okay when pools of one driver + // are invalid if we allocate from some other pool, but it's not safe to + // allocated from an invalid pool. + if pool.IsInvalid { + return false, fmt.Errorf("pool %s is invalid: %s", pool.Pool, pool.InvalidReason) + } + // Finally treat as allocated and move on to the next device. allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false) if err != nil { diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index 68b300d7c57..ede057fa470 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -529,6 +529,28 @@ func TestAllocator(t *testing.T) { deviceAllocationResult(req0, driverA, pool1, device1), )}, }, + "duplicate-slice": { + claimsToAllocate: objects(claim(claim0, req0, classA)), + classes: objects(class(classA, driverA)), + slices: func() []*resourceapi.ResourceSlice { + // This simulates the problem that can + // (theoretically) occur when the resource + // slice controller wants to publish a pool + // with two slices but ends up creating some + // identical slices under different names + // because its informer cache was out-dated on + // another sync (see + // resourceslicecontroller.go). + sliceA := sliceWithOneDevice(slice1, node1, pool1, driverA) + sliceA.Spec.Pool.ResourceSliceCount = 2 + sliceB := sliceA.DeepCopy() + sliceB.Name += "-2" + return []*resourceapi.ResourceSlice{sliceA, sliceB} + }(), + node: node(node1, region1), + + expectError: gomega.MatchError(gomega.ContainSubstring(fmt.Sprintf("pool %s is invalid: duplicate device name %s", pool1, device1))), + }, "no-slices": { claimsToAllocate: objects(claim(claim0, req0, classA)), classes: objects(class(classA, driverA)), diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go index 4c1f8e68bbb..64bb46c7b8e 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" ) @@ -30,8 +31,9 @@ import ( // GatherPools collects information about all resource pools which provide // devices that are accessible from the given node. // -// Out-dated slices are silently ignored. Pools may be incomplete, which is -// recorded in the result. +// Out-dated slices are silently ignored. Pools may be incomplete (not all +// required slices available) or invalid (for example, device names not unique). +// Both is recorded in the result. func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) { pools := make(map[PoolID]*Pool) @@ -75,6 +77,7 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL result := make([]*Pool, 0, len(pools)) for _, pool := range pools { pool.IsIncomplete = int64(len(pool.Slices)) != pool.Slices[0].Spec.Pool.ResourceSliceCount + pool.IsInvalid, pool.InvalidReason = poolIsInvalid(pool) result = append(result, pool) } @@ -101,17 +104,32 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) { if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation { // Newer, replaces all old slices. - pool.Slices = []*resourceapi.ResourceSlice{slice} + pool.Slices = nil } // Add to pool. pool.Slices = append(pool.Slices, slice) } +func poolIsInvalid(pool *Pool) (bool, string) { + devices := sets.New[string]() + for _, slice := range pool.Slices { + for _, device := range slice.Spec.Devices { + if devices.Has(device.Name) { + return true, fmt.Sprintf("duplicate device name %s", device.Name) + } + devices.Insert(device.Name) + } + } + return false, "" +} + type Pool struct { PoolID - IsIncomplete bool - Slices []*resourceapi.ResourceSlice + IsIncomplete bool + IsInvalid bool + InvalidReason string + Slices []*resourceapi.ResourceSlice } type PoolID struct { From d94752ebc87395fd90bc38ad60f0ebe7b8d56ae7 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sat, 26 Oct 2024 20:48:54 +0200 Subject: [PATCH 3/8] DRA resourceslice controller: use preconditions for Delete It's better to verify UID and ResourceVersion of the ResourceSlice that we want to delete. If anything changed, the decision to remove it might not apply anymore and we need to check again. --- .../resourceslice/resourceslicecontroller.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 1fcd434cf1f..6e4c1852c77 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -605,8 +605,22 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // Remove stale slices. for _, slice := range obsoleteSlices { - logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice)) - if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + options := metav1.DeleteOptions{ + Preconditions: &metav1.Preconditions{ + UID: &slice.UID, + ResourceVersion: &slice.ResourceVersion, + }, + } + // It can happen that we sync again shortly after deleting a + // slice and before the slice gets removed from the informer + // cache. The MutationCache can't help here because it does not + // track pending deletes. + // + // If this happens, we get a "not found error" and nothing + // changes on the server. The only downside is the extra API + // call. This isn't as bad as extra creates. + logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice), "deleteOptions", options) + if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, options); err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("delete resource slice: %w", err) } } From e88d5c37e67a45f6fa30d0e06badd79853f3736e Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sun, 27 Oct 2024 13:36:12 +0100 Subject: [PATCH 4/8] DRA resource claim controller: add statistics This is primarily for testing. Proper metrics might be useful, but can still be added later. --- .../resourceslice/resourceslicecontroller.go | 35 ++++++++++++- .../resourceslicecontroller_test.go | 51 ++++++++++++++++++- 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 6e4c1852c77..40c9f338227 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/google/go-cmp/cmp" @@ -66,6 +67,11 @@ type Controller struct { queue workqueue.TypedRateLimitingInterface[string] sliceStore cache.Indexer + // Must use atomic access... + numCreates int64 + numUpdates int64 + numDeletes int64 + mutex sync.RWMutex // When receiving updates from the driver, the entire pointer replaced, @@ -209,6 +215,25 @@ func (c *Controller) Update(resources *DriverResources) { } } +// GetStats provides some insights into operations of the controller. +func (c *Controller) GetStats() Stats { + s := Stats{ + NumCreates: atomic.LoadInt64(&c.numCreates), + NumUpdates: atomic.LoadInt64(&c.numUpdates), + NumDeletes: atomic.LoadInt64(&c.numDeletes), + } + return s +} + +type Stats struct { + // NumCreates counts the number of ResourceSlices that got created. + NumCreates int64 + // NumUpdates counts the number of ResourceSlices that got update. + NumUpdates int64 + // NumDeletes counts the number of ResourceSlices that got deleted. + NumDeletes int64 +} + // newController creates a new controller. func newController(ctx context.Context, options Options) (*Controller, error) { if options.KubeClient == nil { @@ -540,6 +565,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("update resource slice: %w", err) } + atomic.AddInt64(&c.numUpdates, 1) } // Create new slices. @@ -596,6 +622,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { return fmt.Errorf("create resource slice: %w", err) } + atomic.AddInt64(&c.numCreates, 1) } } else if len(slices) > 0 { // All are obsolete, pool does not exist anymore. @@ -620,7 +647,13 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // changes on the server. The only downside is the extra API // call. This isn't as bad as extra creates. logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice), "deleteOptions", options) - if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, options); err != nil && !apierrors.IsNotFound(err) { + err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, options) + switch { + case err == nil: + atomic.AddInt64(&c.numDeletes, 1) + case apierrors.IsNotFound(err): + // okay + default: return fmt.Errorf("delete resource slice: %w", err) } } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go index ef7c7f9ed32..99f72f7bfff 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go @@ -96,6 +96,7 @@ func TestControllerSyncPool(t *testing.T) { initialOtherObjects []runtime.Object inputDriverResources *DriverResources expectedResourceSlices []resourceapi.ResourceSlice + expectedStats Stats }{ "create-slice": { nodeUID: nodeUID, @@ -107,6 +108,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, }, + expectedStats: Stats{ + NumCreates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -145,7 +149,10 @@ func TestControllerSyncPool(t *testing.T) { Driver(driverName).Devices([]resourceapi.Device{}). Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), }, - inputDriverResources: &DriverResources{}, + inputDriverResources: &DriverResources{}, + expectedStats: Stats{ + NumDeletes: 1, + }, expectedResourceSlices: nil, }, "delete-and-add-slice": { @@ -163,6 +170,10 @@ func TestControllerSyncPool(t *testing.T) { Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}}, }, }, + expectedStats: Stats{ + NumDeletes: 1, + NumCreates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -188,6 +199,9 @@ func TestControllerSyncPool(t *testing.T) { Slices: []Slice{{Devices: []resourceapi.Device{{Name: deviceName}}}}}, }, }, + expectedStats: Stats{ + NumDeletes: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -215,6 +229,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, }, + expectedStats: Stats{ + NumUpdates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -248,6 +265,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, }, + expectedStats: Stats{ + NumUpdates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -334,6 +354,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, }, + expectedStats: Stats{ + NumDeletes: 2, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice2).UID(resourceSlice2). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -369,6 +392,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, // Generation not bumped, only one update. + expectedStats: Stats{ + NumUpdates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -412,6 +438,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, // Generation bumped, all updated. + expectedStats: Stats{ + NumUpdates: 3, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).ResourceVersion("1"). @@ -454,6 +483,10 @@ func TestControllerSyncPool(t *testing.T) { }, }, // Generation bumped, two updated, one removed. + expectedStats: Stats{ + NumUpdates: 2, + NumDeletes: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).ResourceVersion("1"). @@ -493,7 +526,10 @@ func TestControllerSyncPool(t *testing.T) { }, }, }, - // Three updated, one generated. + expectedStats: Stats{ + NumUpdates: 3, + NumCreates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -523,6 +559,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, }, + expectedStats: Stats{ + NumCreates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(driverName + "-0").GenerateName(driverName + "-"). AllNodes(true). @@ -540,6 +579,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, }, + expectedStats: Stats{ + NumCreates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). AppOwnerReferences(ownerName).NodeSelector(nodeSelector). @@ -562,6 +604,9 @@ func TestControllerSyncPool(t *testing.T) { }, }, }, + expectedStats: Stats{ + NumUpdates: 1, + }, expectedResourceSlices: []resourceapi.ResourceSlice{ *MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1).ResourceVersion("1"). AppOwnerReferences(ownerName).NodeSelector(otherNodeSelector). @@ -626,6 +671,8 @@ func TestControllerSyncPool(t *testing.T) { sortResourceSlices(resourceSlices.Items) assert.Equal(t, test.expectedResourceSlices, resourceSlices.Items) + assert.Equal(t, test.expectedStats, ctrl.GetStats()) + // The informer might have added a work item after ctrl.run returned. state := queue.State() state.Ready = nil From 7473e643fa159332433a6d0558b0ce6e00dab12a Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sun, 27 Oct 2024 13:32:56 +0100 Subject: [PATCH 5/8] DRA resource slice controller: use MutationCache to avoid race This avoids the problem of creating an additional slice when the one from the previous sync is not in the informer cache yet. It also avoids false attempts to delete slices which were updated in the previous sync. Such attempts would fail the ResourceVersion precondition check, but would still cause work for the apiserver. --- .../resourceslice/resourceslicecontroller.go | 60 +++++++++++++------ .../resourceslicecontroller_test.go | 11 +++- 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 40c9f338227..793699c7664 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -51,6 +51,13 @@ const ( // poolNameIndex is the name for the ResourceSlice store's index function, // which is to index by ResourceSlice.Spec.Pool.Name poolNameIndex = "poolName" + + // Including adds in the mutation cache is not safe: We could add a slice, store it, + // and then the slice gets deleted without the informer hearing anything about that. + // Then the obsolete slice remains in the mutation cache. + // + // To mitigate this, we use a TTL and check a pool again once added slices expire. + defaultMutationCacheTTL = time.Minute ) // Controller synchronizes information about resources of one driver with @@ -64,8 +71,9 @@ type Controller struct { kubeClient kubernetes.Interface wg sync.WaitGroup // The queue is keyed with the pool name that needs work. - queue workqueue.TypedRateLimitingInterface[string] - sliceStore cache.Indexer + queue workqueue.TypedRateLimitingInterface[string] + sliceStore cache.MutationCache + mutationCacheTTL time.Duration // Must use atomic access... numCreates int64 @@ -182,6 +190,10 @@ type Options struct { // Queue can be used to override the default work queue implementation. Queue workqueue.TypedRateLimitingInterface[string] + + // MutationCacheTTL can be used to change the default TTL of one minute. + // See source code for details. + MutationCacheTTL *time.Duration } // Stop cancels all background activity and blocks until the controller has stopped. @@ -249,12 +261,13 @@ func newController(ctx context.Context, options Options) (*Controller, error) { ctx, cancel := context.WithCancelCause(ctx) c := &Controller{ - cancel: cancel, - kubeClient: options.KubeClient, - driverName: options.DriverName, - owner: options.Owner.DeepCopy(), - queue: options.Queue, - resources: options.Resources, + cancel: cancel, + kubeClient: options.KubeClient, + driverName: options.DriverName, + owner: options.Owner.DeepCopy(), + queue: options.Queue, + resources: options.Resources, + mutationCacheTTL: defaultMutationCacheTTL, } if c.queue == nil { c.queue = workqueue.NewTypedRateLimitingQueueWithConfig( @@ -262,7 +275,9 @@ func newController(ctx context.Context, options Options) (*Controller, error) { workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"}, ) } - + if options.MutationCacheTTL != nil { + c.mutationCacheTTL = *options.MutationCacheTTL + } if err := c.initInformer(ctx); err != nil { return nil, err } @@ -298,7 +313,7 @@ func (c *Controller) initInformer(ctx context.Context) error { }, func(options *metav1.ListOptions) { options.FieldSelector = selector.String() }) - c.sliceStore = informer.GetIndexer() + c.sliceStore = cache.NewIntegerResourceVersionMutationCache(informer.GetStore(), informer.GetIndexer(), c.mutationCacheTTL, true /* includeAdds */) handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { slice, ok := obj.(*resourceapi.ResourceSlice) @@ -562,13 +577,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { slice.Spec.Devices = pool.Slices[i].Devices logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { + slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}) + if err != nil { return fmt.Errorf("update resource slice: %w", err) } atomic.AddInt64(&c.numUpdates, 1) + c.sliceStore.Mutation(slice) } // Create new slices. + added := false for i := 0; i < len(pool.Slices); i++ { if _, ok := currentSliceForDesiredSlice[i]; ok { // Was handled above through an update. @@ -608,21 +626,25 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // It can happen that we create a missing slice, some // other change than the create causes another sync of // the pool, and then a second slice for the same set - // of devices gets created because the controller has + // of devices would get created because the controller has // no copy of the first slice instance in its informer // cache yet. // - // This is not a problem: a client will either see one - // of the two slices and use it or see both and do - // nothing because of the duplicated device IDs. - // - // To avoid creating a second slice, we would have to use a - // https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache. + // Using a https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache + // avoids that. logger.V(5).Info("Creating new resource slice") - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { + slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}) + if err != nil { return fmt.Errorf("create resource slice: %w", err) } atomic.AddInt64(&c.numCreates, 1) + c.sliceStore.Mutation(slice) + added = true + } + if added { + // Check that the recently added slice(s) really exist even + // after they expired from the mutation cache. + c.queue.AddAfter(poolName, c.mutationCacheTTL) } } else if len(slices) > 0 { // All are obsolete, pool does not exist anymore. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go index 99f72f7bfff..4a7d3d8e277 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go @@ -674,9 +674,14 @@ func TestControllerSyncPool(t *testing.T) { assert.Equal(t, test.expectedStats, ctrl.GetStats()) // The informer might have added a work item after ctrl.run returned. - state := queue.State() - state.Ready = nil - assert.Equal(t, workqueue.MockState[string]{}, state) + actualState := queue.State() + actualState.Ready = nil + var expectState workqueue.MockState[string] + if test.expectedStats.NumCreates > 0 { + expectState.Later = []workqueue.MockDelayedItem[string]{{Item: poolName, Duration: defaultMutationCacheTTL}} + } + assert.Equal(t, expectState, actualState) + }) } } From 99cf2d8a2e6f0095b993ba592d88fc03517305bc Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Sun, 27 Oct 2024 11:53:57 +0100 Subject: [PATCH 6/8] DRA resource slice controller: add E2E test This test covers creating and deleting 100 large ResourceSlices. It is strict about using the minimum number of calls. The test also verifies that creating large slices works. --- test/e2e/dra/dra.go | 93 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index d03193a3f4c..295faf32268 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -39,8 +39,10 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation" applyv1 "k8s.io/client-go/applyconfigurations/core/v1" "k8s.io/client-go/kubernetes" + "k8s.io/dynamic-resource-allocation/resourceslice" "k8s.io/klog/v2" "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" @@ -727,6 +729,97 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, // TODO (https://github.com/kubernetes/kubernetes/issues/123699): move most of the test below into `testDriver` so that they get // executed with different parameters. + ginkgo.Context("ResourceSlice Controller", func() { + // This is a stress test for creating many large slices. + // Each slice is as large as API limits allow. + // + // Could become a conformance test because it only depends + // on the apiserver. + f.It("creates slices", func(ctx context.Context) { + // Define desired resource slices. + driverName := f.Namespace.Name + numSlices := 100 + devicePrefix := "dev-" + domainSuffix := ".example.com" + poolName := "network-attached" + domain := strings.Repeat("x", 63 /* TODO(pohly): add to API */ -len(domainSuffix)) + domainSuffix + stringValue := strings.Repeat("v", resourceapi.DeviceAttributeMaxValueLength) + pool := resourceslice.Pool{ + Slices: make([]resourceslice.Slice, numSlices), + } + for i := 0; i < numSlices; i++ { + devices := make([]resourceapi.Device, resourceapi.ResourceSliceMaxDevices) + for e := 0; e < resourceapi.ResourceSliceMaxDevices; e++ { + device := resourceapi.Device{ + Name: devicePrefix + strings.Repeat("x", validation.DNS1035LabelMaxLength-len(devicePrefix)-4) + fmt.Sprintf("%04d", e), + Basic: &resourceapi.BasicDevice{ + Attributes: make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice), + }, + } + for j := 0; j < resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice; j++ { + name := resourceapi.QualifiedName(domain + "/" + strings.Repeat("x", resourceapi.DeviceMaxIDLength-4) + fmt.Sprintf("%04d", j)) + device.Basic.Attributes[name] = resourceapi.DeviceAttribute{ + StringValue: &stringValue, + } + } + devices[e] = device + } + pool.Slices[i].Devices = devices + } + resources := &resourceslice.DriverResources{ + Pools: map[string]resourceslice.Pool{poolName: pool}, + } + + ginkgo.By("Creating slices") + mutationCacheTTL := 10 * time.Second + controller, err := resourceslice.StartController(ctx, resourceslice.Options{ + DriverName: driverName, + KubeClient: f.ClientSet, + Resources: resources, + MutationCacheTTL: &mutationCacheTTL, + }) + framework.ExpectNoError(err, "start controller") + ginkgo.DeferCleanup(func(ctx context.Context) { + controller.Stop() + err := f.ClientSet.ResourceV1alpha3().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{ + FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName, + }) + framework.ExpectNoError(err, "delete resource slices") + }) + + // Eventually we should have all desired slices. + listSlices := framework.ListObjects(f.ClientSet.ResourceV1alpha3().ResourceSlices().List, metav1.ListOptions{ + FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName, + }) + gomega.Eventually(ctx, listSlices).WithTimeout(time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(numSlices))) + + // Verify state. + expectSlices, err := listSlices(ctx) + framework.ExpectNoError(err) + gomega.Expect(expectSlices.Items).ShouldNot(gomega.BeEmpty()) + framework.Logf("Protobuf size of one slice is %d bytes = %d KB.", expectSlices.Items[0].Size(), expectSlices.Items[0].Size()/1024) + gomega.Expect(expectSlices.Items[0].Size()).Should(gomega.BeNumerically(">=", 600*1024), "ResourceSlice size") + gomega.Expect(expectSlices.Items[0].Size()).Should(gomega.BeNumerically("<", 1024*1024), "ResourceSlice size") + expectStats := resourceslice.Stats{NumCreates: int64(numSlices)} + gomega.Expect(controller.GetStats()).Should(gomega.Equal(expectStats)) + + // No further changes expected now, after after checking again. + gomega.Consistently(ctx, controller.GetStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats)) + + // Ask the controller to delete all slices except for one empty slice. + ginkgo.By("Deleting slices") + resources = resources.DeepCopy() + resources.Pools[poolName] = resourceslice.Pool{Slices: []resourceslice.Slice{{}}} + controller.Update(resources) + + // One empty slice should remain, after removing the full ones and adding the empty one. + emptySlice := gomega.HaveField("Spec.Devices", gomega.BeEmpty()) + gomega.Eventually(ctx, listSlices).WithTimeout(time.Minute).Should(gomega.HaveField("Items", gomega.ConsistOf(emptySlice))) + expectStats = resourceslice.Stats{NumCreates: int64(numSlices) + 1, NumDeletes: int64(numSlices)} + gomega.Consistently(ctx, controller.GetStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats)) + }) + }) + ginkgo.Context("cluster", func() { nodes := NewNodes(f, 1, 1) driver := NewDriver(f, nodes, networkResources) From 67f0428769c9096783a6002b2d7cb1c880c4f489 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 29 Oct 2024 15:02:56 +0100 Subject: [PATCH 7/8] DRA resourceslice controller: delay sync When deleting a bunch of slices, the delete events queue the pool while it is being synced. It then got synced again immediately, while the deleted slices were still being removed from the informer cache. The obsolete slice in the cache caused the controller to delete it again, which fails with a "not found". That error is ignored, but this still caused extra API calls. Now syncing gets delayed with a configuration duration (default: 30 seconds) so the informer cache is more likely to be up-to-date when the pool gets synced again. --- .../resourceslice/resourceslicecontroller.go | 33 ++++++++++++++----- .../resourceslicecontroller_test.go | 17 ++++++---- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 793699c7664..8d86a3ad2aa 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -58,6 +58,13 @@ const ( // // To mitigate this, we use a TTL and check a pool again once added slices expire. defaultMutationCacheTTL = time.Minute + + // defaultSyncDelay defines how long to wait between receiving the most recent + // informer event and syncing again. This is long enough that the informer cache + // should be up-to-date (matter mostly for deletes because an out-dated cache + // causes redundant delete API calls) and not too long that a human mistake + // doesn't get fixed while that human is waiting for it. + defaultSyncDelay = 30 * time.Second ) // Controller synchronizes information about resources of one driver with @@ -74,6 +81,7 @@ type Controller struct { queue workqueue.TypedRateLimitingInterface[string] sliceStore cache.MutationCache mutationCacheTTL time.Duration + syncDelay time.Duration // Must use atomic access... numCreates int64 @@ -194,6 +202,15 @@ type Options struct { // MutationCacheTTL can be used to change the default TTL of one minute. // See source code for details. MutationCacheTTL *time.Duration + + // SyncDelay defines how long to wait between receiving the most recent + // informer event and syncing again. The default is 30 seconds. + // + // This is long enough that the informer cache should be up-to-date + // (matter mostly for deletes because an out-dated cache causes + // redundant delete API calls) and not too long that a human mistake + // doesn't get fixed while that human is waiting for it. + SyncDelay *time.Duration } // Stop cancels all background activity and blocks until the controller has stopped. @@ -267,7 +284,8 @@ func newController(ctx context.Context, options Options) (*Controller, error) { owner: options.Owner.DeepCopy(), queue: options.Queue, resources: options.Resources, - mutationCacheTTL: defaultMutationCacheTTL, + mutationCacheTTL: ptr.Deref(options.MutationCacheTTL, defaultMutationCacheTTL), + syncDelay: ptr.Deref(options.SyncDelay, defaultSyncDelay), } if c.queue == nil { c.queue = workqueue.NewTypedRateLimitingQueueWithConfig( @@ -275,9 +293,6 @@ func newController(ctx context.Context, options Options) (*Controller, error) { workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"}, ) } - if options.MutationCacheTTL != nil { - c.mutationCacheTTL = *options.MutationCacheTTL - } if err := c.initInformer(ctx); err != nil { return nil, err } @@ -321,7 +336,7 @@ func (c *Controller) initInformer(ctx context.Context) error { return } logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice)) - c.queue.Add(slice.Spec.Pool.Name) + c.queue.AddAfter(slice.Spec.Pool.Name, c.syncDelay) }, UpdateFunc: func(old, new any) { oldSlice, ok := old.(*resourceapi.ResourceSlice) @@ -337,8 +352,8 @@ func (c *Controller) initInformer(ctx context.Context) error { } else { logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice)) } - c.queue.Add(oldSlice.Spec.Pool.Name) - c.queue.Add(newSlice.Spec.Pool.Name) + c.queue.AddAfter(oldSlice.Spec.Pool.Name, c.syncDelay) + c.queue.AddAfter(newSlice.Spec.Pool.Name, c.syncDelay) }, DeleteFunc: func(obj any) { if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { @@ -349,7 +364,7 @@ func (c *Controller) initInformer(ctx context.Context) error { return } logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice)) - c.queue.Add(slice.Spec.Pool.Name) + c.queue.AddAfter(slice.Spec.Pool.Name, c.syncDelay) }, }) if err != nil { @@ -674,7 +689,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { case err == nil: atomic.AddInt64(&c.numDeletes, 1) case apierrors.IsNotFound(err): - // okay + logger.V(5).Info("Resource slice was already deleted earlier", "slice", klog.KObj(slice)) default: return fmt.Errorf("delete resource slice: %w", err) } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go index 4a7d3d8e277..dd461d35251 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go @@ -22,6 +22,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -87,6 +88,7 @@ func TestControllerSyncPool(t *testing.T) { ) testCases := map[string]struct { + syncDelay *time.Duration // nodeUID is empty if not a node-local. nodeUID types.UID // noOwner completely disables setting an owner. @@ -142,7 +144,8 @@ func TestControllerSyncPool(t *testing.T) { }, }, "remove-pool": { - nodeUID: nodeUID, + nodeUID: nodeUID, + syncDelay: ptr.To(time.Duration(0)), initialObjects: []runtime.Object{ MakeResourceSlice().Name(resourceSlice1).UID(resourceSlice1). NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). @@ -653,6 +656,7 @@ func TestControllerSyncPool(t *testing.T) { Owner: owner, Resources: test.inputDriverResources, Queue: &queue, + SyncDelay: test.syncDelay, }) defer ctrl.Stop() require.NoError(t, err, "unexpected controller creation error") @@ -673,15 +677,14 @@ func TestControllerSyncPool(t *testing.T) { assert.Equal(t, test.expectedStats, ctrl.GetStats()) - // The informer might have added a work item after ctrl.run returned. + // The informer might have added a work item before or after ctrl.run returned, + // therefore we cannot compare the `Later` field. It's either defaultMutationCacheTTL + // (last AddAfter call was after a create) or defaultSyncDelay (last AddAfter was + // from informer event handler). actualState := queue.State() - actualState.Ready = nil + actualState.Later = nil var expectState workqueue.MockState[string] - if test.expectedStats.NumCreates > 0 { - expectState.Later = []workqueue.MockDelayedItem[string]{{Item: poolName, Duration: defaultMutationCacheTTL}} - } assert.Equal(t, expectState, actualState) - }) } } From 1088f4fb44a2149bec0330a4d97e944f5af8e691 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 30 Oct 2024 11:48:53 +0100 Subject: [PATCH 8/8] DRA resourceslice controller: do DeepCopy for driver resources The reason for the previous behavior was unnecessary performance overhead that occurs when the caller already provided a "fresh" copy and doesn't touch it afterwards. But this is something that DRA driver developers can easily get wrong, so it's better to be safe than sorry. --- .../resourceslice/resourceslicecontroller.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 8d86a3ad2aa..7a5341b6219 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -191,9 +191,7 @@ type Options struct { // a driver uninstall because garbage collection won't work. Owner *Owner - // This is the initial desired set of slices. As with - // [Controller.Update], the controller takes ownership of the resources - // instance. The content must not get modified by the caller. + // This is the initial desired set of slices. Resources *DriverResources // Queue can be used to override the default work queue implementation. @@ -224,9 +222,8 @@ func (c *Controller) Stop() { // Update sets the new desired state of the resource information. // -// The controller takes over ownership, so these resources must -// not get modified after this method returns. [DriverResources.DeepCopy] -// can be used by the caller to clone some existing instance. +// The controller is doing a deep copy, so the caller may update +// the instance once Update returns. func (c *Controller) Update(resources *DriverResources) { c.mutex.Lock() defer c.mutex.Unlock() @@ -236,7 +233,7 @@ func (c *Controller) Update(resources *DriverResources) { c.queue.Add(poolName) } - c.resources = resources + c.resources = resources.DeepCopy() // ... and the new ones (might be the same). for poolName := range c.resources.Pools { @@ -283,7 +280,7 @@ func newController(ctx context.Context, options Options) (*Controller, error) { driverName: options.DriverName, owner: options.Owner.DeepCopy(), queue: options.Queue, - resources: options.Resources, + resources: options.Resources.DeepCopy(), mutationCacheTTL: ptr.Deref(options.MutationCacheTTL, defaultMutationCacheTTL), syncDelay: ptr.Deref(options.SyncDelay, defaultSyncDelay), }