diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 40c9f338227..793699c7664 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -51,6 +51,13 @@ const ( // poolNameIndex is the name for the ResourceSlice store's index function, // which is to index by ResourceSlice.Spec.Pool.Name poolNameIndex = "poolName" + + // Including adds in the mutation cache is not safe: We could add a slice, store it, + // and then the slice gets deleted without the informer hearing anything about that. + // Then the obsolete slice remains in the mutation cache. + // + // To mitigate this, we use a TTL and check a pool again once added slices expire. + defaultMutationCacheTTL = time.Minute ) // Controller synchronizes information about resources of one driver with @@ -64,8 +71,9 @@ type Controller struct { kubeClient kubernetes.Interface wg sync.WaitGroup // The queue is keyed with the pool name that needs work. - queue workqueue.TypedRateLimitingInterface[string] - sliceStore cache.Indexer + queue workqueue.TypedRateLimitingInterface[string] + sliceStore cache.MutationCache + mutationCacheTTL time.Duration // Must use atomic access... numCreates int64 @@ -182,6 +190,10 @@ type Options struct { // Queue can be used to override the default work queue implementation. Queue workqueue.TypedRateLimitingInterface[string] + + // MutationCacheTTL can be used to change the default TTL of one minute. + // See source code for details. + MutationCacheTTL *time.Duration } // Stop cancels all background activity and blocks until the controller has stopped. @@ -249,12 +261,13 @@ func newController(ctx context.Context, options Options) (*Controller, error) { ctx, cancel := context.WithCancelCause(ctx) c := &Controller{ - cancel: cancel, - kubeClient: options.KubeClient, - driverName: options.DriverName, - owner: options.Owner.DeepCopy(), - queue: options.Queue, - resources: options.Resources, + cancel: cancel, + kubeClient: options.KubeClient, + driverName: options.DriverName, + owner: options.Owner.DeepCopy(), + queue: options.Queue, + resources: options.Resources, + mutationCacheTTL: defaultMutationCacheTTL, } if c.queue == nil { c.queue = workqueue.NewTypedRateLimitingQueueWithConfig( @@ -262,7 +275,9 @@ func newController(ctx context.Context, options Options) (*Controller, error) { workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"}, ) } - + if options.MutationCacheTTL != nil { + c.mutationCacheTTL = *options.MutationCacheTTL + } if err := c.initInformer(ctx); err != nil { return nil, err } @@ -298,7 +313,7 @@ func (c *Controller) initInformer(ctx context.Context) error { }, func(options *metav1.ListOptions) { options.FieldSelector = selector.String() }) - c.sliceStore = informer.GetIndexer() + c.sliceStore = cache.NewIntegerResourceVersionMutationCache(informer.GetStore(), informer.GetIndexer(), c.mutationCacheTTL, true /* includeAdds */) handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { slice, ok := obj.(*resourceapi.ResourceSlice) @@ -562,13 +577,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { slice.Spec.Devices = pool.Slices[i].Devices logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { + slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}) + if err != nil { return fmt.Errorf("update resource slice: %w", err) } atomic.AddInt64(&c.numUpdates, 1) + c.sliceStore.Mutation(slice) } // Create new slices. + added := false for i := 0; i < len(pool.Slices); i++ { if _, ok := currentSliceForDesiredSlice[i]; ok { // Was handled above through an update. @@ -608,21 +626,25 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // It can happen that we create a missing slice, some // other change than the create causes another sync of // the pool, and then a second slice for the same set - // of devices gets created because the controller has + // of devices would get created because the controller has // no copy of the first slice instance in its informer // cache yet. // - // This is not a problem: a client will either see one - // of the two slices and use it or see both and do - // nothing because of the duplicated device IDs. - // - // To avoid creating a second slice, we would have to use a - // https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache. + // Using a https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache + // avoids that. logger.V(5).Info("Creating new resource slice") - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { + slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}) + if err != nil { return fmt.Errorf("create resource slice: %w", err) } atomic.AddInt64(&c.numCreates, 1) + c.sliceStore.Mutation(slice) + added = true + } + if added { + // Check that the recently added slice(s) really exist even + // after they expired from the mutation cache. + c.queue.AddAfter(poolName, c.mutationCacheTTL) } } else if len(slices) > 0 { // All are obsolete, pool does not exist anymore. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go index 99f72f7bfff..4a7d3d8e277 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go @@ -674,9 +674,14 @@ func TestControllerSyncPool(t *testing.T) { assert.Equal(t, test.expectedStats, ctrl.GetStats()) // The informer might have added a work item after ctrl.run returned. - state := queue.State() - state.Ready = nil - assert.Equal(t, workqueue.MockState[string]{}, state) + actualState := queue.State() + actualState.Ready = nil + var expectState workqueue.MockState[string] + if test.expectedStats.NumCreates > 0 { + expectState.Later = []workqueue.MockDelayedItem[string]{{Item: poolName, Duration: defaultMutationCacheTTL}} + } + assert.Equal(t, expectState, actualState) + }) } }