From 913340aa83969fa06cdce6df29ae89e777aa8c98 Mon Sep 17 00:00:00 2001 From: googs1025 Date: Thu, 22 Aug 2024 13:15:15 +0800 Subject: [PATCH] dra(resourceslice): use index lookup instead of using sliceStore.List() --- .../kubeletplugin/draplugin.go | 5 +- .../resourceslice/resourceslicecontroller.go | 103 ++++--- .../resourceslicecontroller_test.go | 270 ++++++++++++++++++ test/e2e/dra/test-driver/app/kubeletplugin.go | 2 +- 4 files changed, 344 insertions(+), 36 deletions(-) create mode 100644 staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 04eb8261223..6209b09c882 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -406,7 +406,10 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e controllerLogger := klog.FromContext(controllerCtx) controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") controllerCtx = klog.NewContext(controllerCtx, controllerLogger) - d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources) + var err error + if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil { + return fmt.Errorf("start ResourceSlice controller: %w", err) + } return nil } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 01fd88be1ba..2c7d2c2a557 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sort" "sync" "time" @@ -44,6 +45,10 @@ const ( // resyncPeriod for informer // TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable? resyncPeriod = time.Duration(10 * time.Minute) + + // poolNameIndex is the name for the ResourceSlice store's index function, + // which is to index by ResourceSlice.Spec.Pool.Name + poolNameIndex = "poolName" ) // Controller synchronizes information about resources of one driver with @@ -58,7 +63,7 @@ type Controller struct { wg sync.WaitGroup // The queue is keyed with the pool name that needs work. queue workqueue.TypedRateLimitingInterface[string] - sliceStore cache.Store + sliceStore cache.Indexer mutex sync.RWMutex @@ -109,24 +114,11 @@ type Owner struct { // the controller is inactive. This can happen when kubelet is run stand-alone // without an apiserver. In that case we can't and don't need to publish // ResourceSlices. -func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) *Controller { - if kubeClient == nil { - return nil - } - +func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) { logger := klog.FromContext(ctx) - ctx, cancel := context.WithCancelCause(ctx) - - c := &Controller{ - cancel: cancel, - kubeClient: kubeClient, - driver: driver, - owner: owner, - queue: workqueue.NewTypedRateLimitingQueueWithConfig( - workqueue.DefaultTypedControllerRateLimiter[string](), - workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"}, - ), - resources: resources, + c, err := newController(ctx, kubeClient, driver, owner, resources) + if err != nil { + return nil, fmt.Errorf("create controller: %w", err) } logger.V(3).Info("Starting") @@ -142,7 +134,7 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive c.queue.Add(poolName) } - return c + return c, nil } // Stop cancels all background activity and blocks until the controller has stopped. @@ -175,9 +167,34 @@ func (c *Controller) Update(resources *DriverResources) { } } -// run is running in the background. It handles blocking initialization (like -// syncing the informer) and then syncs the actual with the desired state. -func (c *Controller) run(ctx context.Context) { +// newController creates a new controller. +func newController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) { + if kubeClient == nil { + return nil, fmt.Errorf("kubeClient is nil") + } + + ctx, cancel := context.WithCancelCause(ctx) + + c := &Controller{ + cancel: cancel, + kubeClient: kubeClient, + driver: driver, + owner: owner, + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"}, + ), + resources: resources, + } + + if err := c.initInformer(ctx); err != nil { + return nil, err + } + return c, nil +} + +// initInformer initializes the informer used to watch for changes to the resources slice. +func (c *Controller) initInformer(ctx context.Context) error { logger := klog.FromContext(ctx) // We always filter by driver name, by node name only for node-local resources. @@ -185,10 +202,18 @@ func (c *Controller) run(ctx context.Context) { if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name } - informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) { + informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, cache.Indexers{ + poolNameIndex: func(obj interface{}) ([]string, error) { + slice, ok := obj.(*resourceapi.ResourceSlice) + if !ok { + return []string{}, nil + } + return []string{slice.Spec.Pool.Name}, nil + }, + }, func(options *metav1.ListOptions) { options.FieldSelector = selector.String() }) - c.sliceStore = informer.GetStore() + c.sliceStore = informer.GetIndexer() handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { slice, ok := obj.(*resourceapi.ResourceSlice) @@ -228,10 +253,8 @@ func (c *Controller) run(ctx context.Context) { }, }) if err != nil { - logger.Error(err, "Registering event handler on the ResourceSlice informer failed, disabling resource monitoring") - return + return fmt.Errorf("registering event handler on the ResourceSlice informer: %w", err) } - // Start informer and wait for our cache to be populated. logger.V(3).Info("Starting ResourceSlice informer and waiting for it to sync") c.wg.Add(1) @@ -245,13 +268,15 @@ func (c *Controller) run(ctx context.Context) { select { case <-time.After(time.Second): case <-ctx.Done(): - return + return fmt.Errorf("sync ResourceSlice informer: %w", context.Cause(ctx)) } } logger.V(3).Info("ResourceSlice informer has synced") + return nil +} - // Seed the - +// run is running in the background. +func (c *Controller) run(ctx context.Context) { for c.processNextWorkItem(ctx) { } } @@ -295,10 +320,13 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { logger := klog.FromContext(ctx) // Gather information about the actual and desired state. - // TODO: index by pool name. var slices []*resourceapi.ResourceSlice - for _, obj := range c.sliceStore.List() { - if slice, ok := obj.(*resourceapi.ResourceSlice); ok && slice.Spec.Pool.Name == poolName { + objs, err := c.sliceStore.ByIndex(poolNameIndex, poolName) + if err != nil { + return fmt.Errorf("retrieve ResourceSlice objects: %w", err) + } + for _, obj := range objs { + if slice, ok := obj.(*resourceapi.ResourceSlice); ok { slices = append(slices, slice) } } @@ -346,6 +374,11 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { } slices = currentSlices + // Sort by name to ensure that keeping only the first slice is deterministic. + sort.Slice(slices, func(i, j int) bool { + return slices[i].Name < slices[j].Name + }) + if pool, ok := resources.Pools[poolName]; ok { if pool.Generation > generation { generation = pool.Generation @@ -397,6 +430,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices)) obsoleteSlices = append(obsoleteSlices, slices...) + // No need to create or update the slices. + slices = nil } // Remove stale slices. @@ -420,7 +455,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // TODO: switch to SSA once unit testing supports it. logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("delete resource slice: %w", err) + return fmt.Errorf("update resource slice: %w", err) } } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go new file mode 100644 index 00000000000..1ba33da54c5 --- /dev/null +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go @@ -0,0 +1,270 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourceslice + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1alpha3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2/ktesting" + "k8s.io/utils/ptr" +) + +func TestControllerSyncPool(t *testing.T) { + + var ( + node = "node" + nodeUID = types.UID("node-uid") + driveName = "driver" + poolName = "pool" + poolName1 = "pool-1" + deviceName = "device" + resourceSlice1 = "resource-slice-1" + resourceSlice2 = "resource-slice-2" + resourceSlice3 = "resource-slice-3" + ) + + testCases := map[string]struct { + nodeUID types.UID + initialObjects []runtime.Object + poolName string + inputDriverResources *DriverResources + expectedResourceSlices []resourceapi.ResourceSlice + }{ + "single-resourceslice-existing": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + newResourceSlice(resourceSlice1, node, driveName, poolName, 1), + }, + poolName: poolName, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Devices: []resourceapi.Device{ + { + Name: deviceName, + }, + }, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), + }, + }, + "single-resourceslice-existing-with-nodeUID-empty": { + nodeUID: "", + initialObjects: []runtime.Object{ + newNode(node, nodeUID), + newResourceSlice(resourceSlice1, node, driveName, poolName, 1), + }, + poolName: poolName, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Devices: []resourceapi.Device{ + { + Name: deviceName, + }, + }, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), + }, + }, + "multiple-resourceslices-existing": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + newResourceSlice(resourceSlice1, node, driveName, poolName, 1), + newResourceSlice(resourceSlice2, node, driveName, poolName, 1), + newResourceSlice(resourceSlice3, node, driveName, poolName, 1), + }, + poolName: poolName, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Devices: []resourceapi.Device{ + { + Name: deviceName, + }, + }, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1), + }, + }, + "no-resourceslices-existing": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{}, + poolName: poolName, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Devices: []resourceapi.Device{ + { + Name: deviceName, + }, + }, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *newExpectResourceSlice(node+"-"+driveName+"-", node, string(nodeUID), driveName, poolName, deviceName, true, 0), + }, + }, + "single-resourceslice-existing-with-different-resource-pool-generation": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + newResourceSlice(resourceSlice1, node, driveName, poolName, 2), + newResourceSlice(resourceSlice2, node, driveName, poolName, 1), + newResourceSlice(resourceSlice3, node, driveName, poolName, 1), + }, + poolName: poolName, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Generation: 3, + Devices: []resourceapi.Device{ + { + Name: deviceName, + }, + }, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 3), + }, + }, + "single-resourceslice-existing-with-mismatching-resource-pool-name": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + newResourceSlice(resourceSlice1, node, driveName, poolName, 1), + }, + poolName: poolName, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName1: { + Generation: 1, + Devices: []resourceapi.Device{ + { + Name: deviceName, + }, + }, + }, + }, + }, + expectedResourceSlices: nil, + }, + } + + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + kubeClient := fake.NewSimpleClientset(test.initialObjects...) + owner := Owner{ + APIVersion: "v1", + Kind: "Node", + Name: node, + UID: test.nodeUID, + } + ctrl, err := newController(ctx, kubeClient, driveName, owner, test.inputDriverResources) + defer ctrl.Stop() + require.NoError(t, err, "unexpected controller creation error") + err = ctrl.syncPool(ctx, test.poolName) + require.NoError(t, err, "unexpected syncPool error") + + // Check ResourceSlices + resourceSlices, err := kubeClient.ResourceV1alpha3().ResourceSlices().List(ctx, metav1.ListOptions{}) + require.NoError(t, err, "list resource slices") + assert.Equal(t, test.expectedResourceSlices, resourceSlices.Items, "unexpected ResourceSlices") + }) + } +} + +func newNode(name string, uid types.UID) *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: uid, + }, + } +} + +func newResourceSlice(name, nodeName, driveName, poolName string, poolGeneration int64) *resourceapi.ResourceSlice { + return &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(name), + }, + Spec: resourceapi.ResourceSliceSpec{ + NodeName: nodeName, + Driver: driveName, + Pool: resourceapi.ResourcePool{ + Name: poolName, + ResourceSliceCount: 1, + Generation: poolGeneration, + }, + }, + } +} + +func newExpectResourceSlice(name, nodeName, nodeUID, driveName, poolName, deviceName string, generateName bool, poolGeneration int64) *resourceapi.ResourceSlice { + resourceSlice := &resourceapi.ResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Node", + Name: nodeName, + UID: types.UID(nodeUID), + Controller: ptr.To(true), + }, + }, + }, + Spec: resourceapi.ResourceSliceSpec{ + NodeName: nodeName, + Driver: driveName, + Pool: resourceapi.ResourcePool{ + Name: poolName, + ResourceSliceCount: 1, + Generation: poolGeneration, + }, + Devices: []resourceapi.Device{{Name: deviceName}}, + }, + } + + if generateName { + resourceSlice.ObjectMeta.GenerateName = name + } else { + resourceSlice.ObjectMeta.Name = name + resourceSlice.ObjectMeta.UID = types.UID(name) + } + return resourceSlice +} diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 1cf2716b7fb..de7c114ee34 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -204,7 +204,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube return ex, nil } -// stop ensures that all servers are stopped and resources freed. +// Stop ensures that all servers are stopped and resources freed. func (ex *ExamplePlugin) Stop() { ex.d.Stop() }