dra(resourceslice): use index lookup instead of using sliceStore.List()

This commit is contained in:
googs1025 2024-08-22 13:15:15 +08:00
parent 7c599d37c0
commit 913340aa83
4 changed files with 344 additions and 36 deletions

View File

@ -406,7 +406,10 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
controllerLogger := klog.FromContext(controllerCtx)
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
controllerCtx = klog.NewContext(controllerCtx, controllerLogger)
d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources)
var err error
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil {
return fmt.Errorf("start ResourceSlice controller: %w", err)
}
return nil
}

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
@ -44,6 +45,10 @@ const (
// resyncPeriod for informer
// TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable?
resyncPeriod = time.Duration(10 * time.Minute)
// poolNameIndex is the name for the ResourceSlice store's index function,
// which is to index by ResourceSlice.Spec.Pool.Name
poolNameIndex = "poolName"
)
// Controller synchronizes information about resources of one driver with
@ -58,7 +63,7 @@ type Controller struct {
wg sync.WaitGroup
// The queue is keyed with the pool name that needs work.
queue workqueue.TypedRateLimitingInterface[string]
sliceStore cache.Store
sliceStore cache.Indexer
mutex sync.RWMutex
@ -109,24 +114,11 @@ type Owner struct {
// the controller is inactive. This can happen when kubelet is run stand-alone
// without an apiserver. In that case we can't and don't need to publish
// ResourceSlices.
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) *Controller {
if kubeClient == nil {
return nil
}
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) {
logger := klog.FromContext(ctx)
ctx, cancel := context.WithCancelCause(ctx)
c := &Controller{
cancel: cancel,
kubeClient: kubeClient,
driver: driver,
owner: owner,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
),
resources: resources,
c, err := newController(ctx, kubeClient, driver, owner, resources)
if err != nil {
return nil, fmt.Errorf("create controller: %w", err)
}
logger.V(3).Info("Starting")
@ -142,7 +134,7 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive
c.queue.Add(poolName)
}
return c
return c, nil
}
// Stop cancels all background activity and blocks until the controller has stopped.
@ -175,9 +167,34 @@ func (c *Controller) Update(resources *DriverResources) {
}
}
// run is running in the background. It handles blocking initialization (like
// syncing the informer) and then syncs the actual with the desired state.
func (c *Controller) run(ctx context.Context) {
// newController creates a new controller.
func newController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) {
if kubeClient == nil {
return nil, fmt.Errorf("kubeClient is nil")
}
ctx, cancel := context.WithCancelCause(ctx)
c := &Controller{
cancel: cancel,
kubeClient: kubeClient,
driver: driver,
owner: owner,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
),
resources: resources,
}
if err := c.initInformer(ctx); err != nil {
return nil, err
}
return c, nil
}
// initInformer initializes the informer used to watch for changes to the resources slice.
func (c *Controller) initInformer(ctx context.Context) error {
logger := klog.FromContext(ctx)
// We always filter by driver name, by node name only for node-local resources.
@ -185,10 +202,18 @@ func (c *Controller) run(ctx context.Context) {
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name
}
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, cache.Indexers{
poolNameIndex: func(obj interface{}) ([]string, error) {
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return []string{}, nil
}
return []string{slice.Spec.Pool.Name}, nil
},
}, func(options *metav1.ListOptions) {
options.FieldSelector = selector.String()
})
c.sliceStore = informer.GetStore()
c.sliceStore = informer.GetIndexer()
handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
slice, ok := obj.(*resourceapi.ResourceSlice)
@ -228,10 +253,8 @@ func (c *Controller) run(ctx context.Context) {
},
})
if err != nil {
logger.Error(err, "Registering event handler on the ResourceSlice informer failed, disabling resource monitoring")
return
return fmt.Errorf("registering event handler on the ResourceSlice informer: %w", err)
}
// Start informer and wait for our cache to be populated.
logger.V(3).Info("Starting ResourceSlice informer and waiting for it to sync")
c.wg.Add(1)
@ -245,13 +268,15 @@ func (c *Controller) run(ctx context.Context) {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
return fmt.Errorf("sync ResourceSlice informer: %w", context.Cause(ctx))
}
}
logger.V(3).Info("ResourceSlice informer has synced")
return nil
}
// Seed the
// run is running in the background.
func (c *Controller) run(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
@ -295,10 +320,13 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
logger := klog.FromContext(ctx)
// Gather information about the actual and desired state.
// TODO: index by pool name.
var slices []*resourceapi.ResourceSlice
for _, obj := range c.sliceStore.List() {
if slice, ok := obj.(*resourceapi.ResourceSlice); ok && slice.Spec.Pool.Name == poolName {
objs, err := c.sliceStore.ByIndex(poolNameIndex, poolName)
if err != nil {
return fmt.Errorf("retrieve ResourceSlice objects: %w", err)
}
for _, obj := range objs {
if slice, ok := obj.(*resourceapi.ResourceSlice); ok {
slices = append(slices, slice)
}
}
@ -346,6 +374,11 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
}
slices = currentSlices
// Sort by name to ensure that keeping only the first slice is deterministic.
sort.Slice(slices, func(i, j int) bool {
return slices[i].Name < slices[j].Name
})
if pool, ok := resources.Pools[poolName]; ok {
if pool.Generation > generation {
generation = pool.Generation
@ -397,6 +430,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices))
obsoleteSlices = append(obsoleteSlices, slices...)
// No need to create or update the slices.
slices = nil
}
// Remove stale slices.
@ -420,7 +455,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
// TODO: switch to SSA once unit testing supports it.
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("delete resource slice: %w", err)
return fmt.Errorf("update resource slice: %w", err)
}
}

View File

@ -0,0 +1,270 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceslice
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
)
func TestControllerSyncPool(t *testing.T) {
var (
node = "node"
nodeUID = types.UID("node-uid")
driveName = "driver"
poolName = "pool"
poolName1 = "pool-1"
deviceName = "device"
resourceSlice1 = "resource-slice-1"
resourceSlice2 = "resource-slice-2"
resourceSlice3 = "resource-slice-3"
)
testCases := map[string]struct {
nodeUID types.UID
initialObjects []runtime.Object
poolName string
inputDriverResources *DriverResources
expectedResourceSlices []resourceapi.ResourceSlice
}{
"single-resourceslice-existing": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
newResourceSlice(resourceSlice1, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1),
},
},
"single-resourceslice-existing-with-nodeUID-empty": {
nodeUID: "",
initialObjects: []runtime.Object{
newNode(node, nodeUID),
newResourceSlice(resourceSlice1, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1),
},
},
"multiple-resourceslices-existing": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
newResourceSlice(resourceSlice1, node, driveName, poolName, 1),
newResourceSlice(resourceSlice2, node, driveName, poolName, 1),
newResourceSlice(resourceSlice3, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1),
},
},
"no-resourceslices-existing": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(node+"-"+driveName+"-", node, string(nodeUID), driveName, poolName, deviceName, true, 0),
},
},
"single-resourceslice-existing-with-different-resource-pool-generation": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
newResourceSlice(resourceSlice1, node, driveName, poolName, 2),
newResourceSlice(resourceSlice2, node, driveName, poolName, 1),
newResourceSlice(resourceSlice3, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Generation: 3,
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 3),
},
},
"single-resourceslice-existing-with-mismatching-resource-pool-name": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
newResourceSlice(resourceSlice1, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName1: {
Generation: 1,
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: nil,
},
}
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
kubeClient := fake.NewSimpleClientset(test.initialObjects...)
owner := Owner{
APIVersion: "v1",
Kind: "Node",
Name: node,
UID: test.nodeUID,
}
ctrl, err := newController(ctx, kubeClient, driveName, owner, test.inputDriverResources)
defer ctrl.Stop()
require.NoError(t, err, "unexpected controller creation error")
err = ctrl.syncPool(ctx, test.poolName)
require.NoError(t, err, "unexpected syncPool error")
// Check ResourceSlices
resourceSlices, err := kubeClient.ResourceV1alpha3().ResourceSlices().List(ctx, metav1.ListOptions{})
require.NoError(t, err, "list resource slices")
assert.Equal(t, test.expectedResourceSlices, resourceSlices.Items, "unexpected ResourceSlices")
})
}
}
func newNode(name string, uid types.UID) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: uid,
},
}
}
func newResourceSlice(name, nodeName, driveName, poolName string, poolGeneration int64) *resourceapi.ResourceSlice {
return &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
},
Spec: resourceapi.ResourceSliceSpec{
NodeName: nodeName,
Driver: driveName,
Pool: resourceapi.ResourcePool{
Name: poolName,
ResourceSliceCount: 1,
Generation: poolGeneration,
},
},
}
}
func newExpectResourceSlice(name, nodeName, nodeUID, driveName, poolName, deviceName string, generateName bool, poolGeneration int64) *resourceapi.ResourceSlice {
resourceSlice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeUID),
Controller: ptr.To(true),
},
},
},
Spec: resourceapi.ResourceSliceSpec{
NodeName: nodeName,
Driver: driveName,
Pool: resourceapi.ResourcePool{
Name: poolName,
ResourceSliceCount: 1,
Generation: poolGeneration,
},
Devices: []resourceapi.Device{{Name: deviceName}},
},
}
if generateName {
resourceSlice.ObjectMeta.GenerateName = name
} else {
resourceSlice.ObjectMeta.Name = name
resourceSlice.ObjectMeta.UID = types.UID(name)
}
return resourceSlice
}

View File

@ -204,7 +204,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
return ex, nil
}
// stop ensures that all servers are stopped and resources freed.
// Stop ensures that all servers are stopped and resources freed.
func (ex *ExamplePlugin) Stop() {
ex.d.Stop()
}