Merge pull request #126868 from googs1025/dra/resourceslice_cleanup

dra(resourceslice): resourceslice controller enhancements
This commit is contained in:
Kubernetes Prow Robot 2024-09-13 18:11:12 +01:00 committed by GitHub
commit 253013ed61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 344 additions and 36 deletions

View File

@ -406,7 +406,10 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
controllerLogger := klog.FromContext(controllerCtx) controllerLogger := klog.FromContext(controllerCtx)
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
controllerCtx = klog.NewContext(controllerCtx, controllerLogger) controllerCtx = klog.NewContext(controllerCtx, controllerLogger)
d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources) var err error
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil {
return fmt.Errorf("start ResourceSlice controller: %w", err)
}
return nil return nil
} }

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sort"
"sync" "sync"
"time" "time"
@ -44,6 +45,10 @@ const (
// resyncPeriod for informer // resyncPeriod for informer
// TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable? // TODO (https://github.com/kubernetes/kubernetes/issues/123688): disable?
resyncPeriod = time.Duration(10 * time.Minute) resyncPeriod = time.Duration(10 * time.Minute)
// poolNameIndex is the name for the ResourceSlice store's index function,
// which is to index by ResourceSlice.Spec.Pool.Name
poolNameIndex = "poolName"
) )
// Controller synchronizes information about resources of one driver with // Controller synchronizes information about resources of one driver with
@ -58,7 +63,7 @@ type Controller struct {
wg sync.WaitGroup wg sync.WaitGroup
// The queue is keyed with the pool name that needs work. // The queue is keyed with the pool name that needs work.
queue workqueue.TypedRateLimitingInterface[string] queue workqueue.TypedRateLimitingInterface[string]
sliceStore cache.Store sliceStore cache.Indexer
mutex sync.RWMutex mutex sync.RWMutex
@ -109,24 +114,11 @@ type Owner struct {
// the controller is inactive. This can happen when kubelet is run stand-alone // the controller is inactive. This can happen when kubelet is run stand-alone
// without an apiserver. In that case we can't and don't need to publish // without an apiserver. In that case we can't and don't need to publish
// ResourceSlices. // ResourceSlices.
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) *Controller { func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) {
if kubeClient == nil {
return nil
}
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
ctx, cancel := context.WithCancelCause(ctx) c, err := newController(ctx, kubeClient, driver, owner, resources)
if err != nil {
c := &Controller{ return nil, fmt.Errorf("create controller: %w", err)
cancel: cancel,
kubeClient: kubeClient,
driver: driver,
owner: owner,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
),
resources: resources,
} }
logger.V(3).Info("Starting") logger.V(3).Info("Starting")
@ -142,7 +134,7 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive
c.queue.Add(poolName) c.queue.Add(poolName)
} }
return c return c, nil
} }
// Stop cancels all background activity and blocks until the controller has stopped. // Stop cancels all background activity and blocks until the controller has stopped.
@ -175,9 +167,34 @@ func (c *Controller) Update(resources *DriverResources) {
} }
} }
// run is running in the background. It handles blocking initialization (like // newController creates a new controller.
// syncing the informer) and then syncs the actual with the desired state. func newController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) {
func (c *Controller) run(ctx context.Context) { if kubeClient == nil {
return nil, fmt.Errorf("kubeClient is nil")
}
ctx, cancel := context.WithCancelCause(ctx)
c := &Controller{
cancel: cancel,
kubeClient: kubeClient,
driver: driver,
owner: owner,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
),
resources: resources,
}
if err := c.initInformer(ctx); err != nil {
return nil, err
}
return c, nil
}
// initInformer initializes the informer used to watch for changes to the resources slice.
func (c *Controller) initInformer(ctx context.Context) error {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// We always filter by driver name, by node name only for node-local resources. // We always filter by driver name, by node name only for node-local resources.
@ -185,10 +202,18 @@ func (c *Controller) run(ctx context.Context) {
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name
} }
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) { informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, cache.Indexers{
poolNameIndex: func(obj interface{}) ([]string, error) {
slice, ok := obj.(*resourceapi.ResourceSlice)
if !ok {
return []string{}, nil
}
return []string{slice.Spec.Pool.Name}, nil
},
}, func(options *metav1.ListOptions) {
options.FieldSelector = selector.String() options.FieldSelector = selector.String()
}) })
c.sliceStore = informer.GetStore() c.sliceStore = informer.GetIndexer()
handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) { AddFunc: func(obj any) {
slice, ok := obj.(*resourceapi.ResourceSlice) slice, ok := obj.(*resourceapi.ResourceSlice)
@ -228,10 +253,8 @@ func (c *Controller) run(ctx context.Context) {
}, },
}) })
if err != nil { if err != nil {
logger.Error(err, "Registering event handler on the ResourceSlice informer failed, disabling resource monitoring") return fmt.Errorf("registering event handler on the ResourceSlice informer: %w", err)
return
} }
// Start informer and wait for our cache to be populated. // Start informer and wait for our cache to be populated.
logger.V(3).Info("Starting ResourceSlice informer and waiting for it to sync") logger.V(3).Info("Starting ResourceSlice informer and waiting for it to sync")
c.wg.Add(1) c.wg.Add(1)
@ -245,13 +268,15 @@ func (c *Controller) run(ctx context.Context) {
select { select {
case <-time.After(time.Second): case <-time.After(time.Second):
case <-ctx.Done(): case <-ctx.Done():
return return fmt.Errorf("sync ResourceSlice informer: %w", context.Cause(ctx))
} }
} }
logger.V(3).Info("ResourceSlice informer has synced") logger.V(3).Info("ResourceSlice informer has synced")
return nil
}
// Seed the // run is running in the background.
func (c *Controller) run(ctx context.Context) {
for c.processNextWorkItem(ctx) { for c.processNextWorkItem(ctx) {
} }
} }
@ -295,10 +320,13 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// Gather information about the actual and desired state. // Gather information about the actual and desired state.
// TODO: index by pool name.
var slices []*resourceapi.ResourceSlice var slices []*resourceapi.ResourceSlice
for _, obj := range c.sliceStore.List() { objs, err := c.sliceStore.ByIndex(poolNameIndex, poolName)
if slice, ok := obj.(*resourceapi.ResourceSlice); ok && slice.Spec.Pool.Name == poolName { if err != nil {
return fmt.Errorf("retrieve ResourceSlice objects: %w", err)
}
for _, obj := range objs {
if slice, ok := obj.(*resourceapi.ResourceSlice); ok {
slices = append(slices, slice) slices = append(slices, slice)
} }
} }
@ -346,6 +374,11 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
} }
slices = currentSlices slices = currentSlices
// Sort by name to ensure that keeping only the first slice is deterministic.
sort.Slice(slices, func(i, j int) bool {
return slices[i].Name < slices[j].Name
})
if pool, ok := resources.Pools[poolName]; ok { if pool, ok := resources.Pools[poolName]; ok {
if pool.Generation > generation { if pool.Generation > generation {
generation = pool.Generation generation = pool.Generation
@ -397,6 +430,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices)) logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices))
obsoleteSlices = append(obsoleteSlices, slices...) obsoleteSlices = append(obsoleteSlices, slices...)
// No need to create or update the slices.
slices = nil
} }
// Remove stale slices. // Remove stale slices.
@ -420,7 +455,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
// TODO: switch to SSA once unit testing supports it. // TODO: switch to SSA once unit testing supports it.
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("delete resource slice: %w", err) return fmt.Errorf("update resource slice: %w", err)
} }
} }

View File

@ -0,0 +1,270 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceslice
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
)
func TestControllerSyncPool(t *testing.T) {
var (
node = "node"
nodeUID = types.UID("node-uid")
driveName = "driver"
poolName = "pool"
poolName1 = "pool-1"
deviceName = "device"
resourceSlice1 = "resource-slice-1"
resourceSlice2 = "resource-slice-2"
resourceSlice3 = "resource-slice-3"
)
testCases := map[string]struct {
nodeUID types.UID
initialObjects []runtime.Object
poolName string
inputDriverResources *DriverResources
expectedResourceSlices []resourceapi.ResourceSlice
}{
"single-resourceslice-existing": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
newResourceSlice(resourceSlice1, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1),
},
},
"single-resourceslice-existing-with-nodeUID-empty": {
nodeUID: "",
initialObjects: []runtime.Object{
newNode(node, nodeUID),
newResourceSlice(resourceSlice1, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1),
},
},
"multiple-resourceslices-existing": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
newResourceSlice(resourceSlice1, node, driveName, poolName, 1),
newResourceSlice(resourceSlice2, node, driveName, poolName, 1),
newResourceSlice(resourceSlice3, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 1),
},
},
"no-resourceslices-existing": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(node+"-"+driveName+"-", node, string(nodeUID), driveName, poolName, deviceName, true, 0),
},
},
"single-resourceslice-existing-with-different-resource-pool-generation": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
newResourceSlice(resourceSlice1, node, driveName, poolName, 2),
newResourceSlice(resourceSlice2, node, driveName, poolName, 1),
newResourceSlice(resourceSlice3, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Generation: 3,
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*newExpectResourceSlice(resourceSlice1, node, string(nodeUID), driveName, poolName, deviceName, false, 3),
},
},
"single-resourceslice-existing-with-mismatching-resource-pool-name": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
newResourceSlice(resourceSlice1, node, driveName, poolName, 1),
},
poolName: poolName,
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName1: {
Generation: 1,
Devices: []resourceapi.Device{
{
Name: deviceName,
},
},
},
},
},
expectedResourceSlices: nil,
},
}
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
kubeClient := fake.NewSimpleClientset(test.initialObjects...)
owner := Owner{
APIVersion: "v1",
Kind: "Node",
Name: node,
UID: test.nodeUID,
}
ctrl, err := newController(ctx, kubeClient, driveName, owner, test.inputDriverResources)
defer ctrl.Stop()
require.NoError(t, err, "unexpected controller creation error")
err = ctrl.syncPool(ctx, test.poolName)
require.NoError(t, err, "unexpected syncPool error")
// Check ResourceSlices
resourceSlices, err := kubeClient.ResourceV1alpha3().ResourceSlices().List(ctx, metav1.ListOptions{})
require.NoError(t, err, "list resource slices")
assert.Equal(t, test.expectedResourceSlices, resourceSlices.Items, "unexpected ResourceSlices")
})
}
}
func newNode(name string, uid types.UID) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: uid,
},
}
}
func newResourceSlice(name, nodeName, driveName, poolName string, poolGeneration int64) *resourceapi.ResourceSlice {
return &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
},
Spec: resourceapi.ResourceSliceSpec{
NodeName: nodeName,
Driver: driveName,
Pool: resourceapi.ResourcePool{
Name: poolName,
ResourceSliceCount: 1,
Generation: poolGeneration,
},
},
}
}
func newExpectResourceSlice(name, nodeName, nodeUID, driveName, poolName, deviceName string, generateName bool, poolGeneration int64) *resourceapi.ResourceSlice {
resourceSlice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeUID),
Controller: ptr.To(true),
},
},
},
Spec: resourceapi.ResourceSliceSpec{
NodeName: nodeName,
Driver: driveName,
Pool: resourceapi.ResourcePool{
Name: poolName,
ResourceSliceCount: 1,
Generation: poolGeneration,
},
Devices: []resourceapi.Device{{Name: deviceName}},
},
}
if generateName {
resourceSlice.ObjectMeta.GenerateName = name
} else {
resourceSlice.ObjectMeta.Name = name
resourceSlice.ObjectMeta.UID = types.UID(name)
}
return resourceSlice
}

View File

@ -204,7 +204,7 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
return ex, nil return ex, nil
} }
// stop ensures that all servers are stopped and resources freed. // Stop ensures that all servers are stopped and resources freed.
func (ex *ExamplePlugin) Stop() { func (ex *ExamplePlugin) Stop() {
ex.d.Stop() ex.d.Stop()
} }