mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-31 08:36:16 +00:00
Merge pull request #128275 from pohly/dra-resourceslice-controller-multiple-slices
DRA resourceslice controller: support publishing multiple slices
This commit is contained in:
@@ -0,0 +1,247 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workqueue
|
||||
|
||||
import (
|
||||
"maps"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
// TODO (pohly): move this to k8s.io/client-go/util/workqueue/mockqueue.go
|
||||
// if it turns out to be generally useful. Doc comments are already written
|
||||
// as if the code was there.
|
||||
|
||||
// MockQueue is an implementation of [TypedRateLimitingInterface] which
|
||||
// can be used to test a function which pulls work items out of a queue
|
||||
// and processes them. It is thread-safe.
|
||||
//
|
||||
// A null instance is directly usable. The usual usage is:
|
||||
//
|
||||
// var m workqueue.Mock[string]
|
||||
// m.SyncOne("some-item", func(queue workqueue.TypedRateLimitingInterface[string]) { ... } )
|
||||
// if diff := cmp.Diff(workqueue.MockState[string]{}, m.State()); diff != "" {
|
||||
// t.Errorf("unexpected state of mock work queue after sync (-want, +got):\n%s", diff)
|
||||
// }
|
||||
//
|
||||
// All slices get reset to nil when they become empty, so there are no spurious
|
||||
// differences because of nil vs. empty slice.
|
||||
type Mock[T comparable] struct {
|
||||
mutex sync.Mutex
|
||||
state MockState[T]
|
||||
}
|
||||
|
||||
type MockState[T comparable] struct {
|
||||
// Ready contains the items which are ready for processing.
|
||||
Ready []T
|
||||
|
||||
// InFlight contains the items which are currently being processed (= Get
|
||||
// was called, Done not yet).
|
||||
InFlight []T
|
||||
|
||||
// MismatchedDone contains the items for which Done was called without
|
||||
// a matching Get.
|
||||
MismatchedDone []T
|
||||
|
||||
// Later contains the items which are meant to be added to the queue after
|
||||
// a certain delay (= AddAfter was called for them). They appear in the
|
||||
// order in which AddAfter got called.
|
||||
Later []MockDelayedItem[T]
|
||||
|
||||
// Failures contains the items and their retry count which failed to be
|
||||
// processed (AddRateLimited called at least once, Forget not yet).
|
||||
// The retry count is always larger than zero.
|
||||
Failures map[T]int
|
||||
|
||||
// ShutDownCalled tracks how often ShutDown got called.
|
||||
ShutDownCalled int
|
||||
|
||||
// ShutDownWithDrainCalled tracks how often ShutDownWithDrain got called.
|
||||
ShutDownWithDrainCalled int
|
||||
}
|
||||
|
||||
// DeepCopy takes a snapshot of all slices. It cannot do a deep copy of the items in those slices,
|
||||
// but typically those keys are immutable.
|
||||
func (m MockState[T]) DeepCopy() *MockState[T] {
|
||||
m.Ready = slices.Clone(m.Ready)
|
||||
m.InFlight = slices.Clone(m.InFlight)
|
||||
m.MismatchedDone = slices.Clone(m.MismatchedDone)
|
||||
m.Later = slices.Clone(m.Later)
|
||||
m.Failures = maps.Clone(m.Failures)
|
||||
return &m
|
||||
}
|
||||
|
||||
// MockDelayedItem is an item which was queue for later processing.
|
||||
type MockDelayedItem[T comparable] struct {
|
||||
Item T
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
// SyncOne adds the item to the work queue and calls sync.
|
||||
// That sync function can pull one or more items from the work
|
||||
// queue until the queue is empty. Then it is told that the queue
|
||||
// is shutting down, which must cause it to return.
|
||||
//
|
||||
// The test can then retrieve the state of the queue to check the result.
|
||||
func (m *Mock[T]) SyncOne(item T, sync func(workqueue.TypedRateLimitingInterface[T])) {
|
||||
// sync must run with the mutex not locked.
|
||||
defer sync(m)
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.state.Ready = append(m.state.Ready, item)
|
||||
}
|
||||
|
||||
// State returns the current state of the queue.
|
||||
func (m *Mock[T]) State() MockState[T] {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
return *m.state.DeepCopy()
|
||||
}
|
||||
|
||||
// Add implements [TypedInterface].
|
||||
func (m *Mock[T]) Add(item T) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if !slices.Contains(m.state.Ready, item) {
|
||||
m.state.Ready = append(m.state.Ready, item)
|
||||
}
|
||||
}
|
||||
|
||||
// Len implements [TypedInterface].
|
||||
func (m *Mock[T]) Len() int {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
return len(m.state.Ready)
|
||||
}
|
||||
|
||||
// Get implements [TypedInterface].
|
||||
func (m *Mock[T]) Get() (item T, shutdown bool) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if len(m.state.Ready) == 0 {
|
||||
shutdown = true
|
||||
return
|
||||
}
|
||||
item = m.state.Ready[0]
|
||||
m.state.Ready = m.state.Ready[1:]
|
||||
if len(m.state.Ready) == 0 {
|
||||
m.state.Ready = nil
|
||||
}
|
||||
m.state.InFlight = append(m.state.InFlight, item)
|
||||
return item, false
|
||||
}
|
||||
|
||||
// Done implements [TypedInterface].
|
||||
func (m *Mock[T]) Done(item T) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
index := slices.Index(m.state.InFlight, item)
|
||||
if index < 0 {
|
||||
m.state.MismatchedDone = append(m.state.MismatchedDone, item)
|
||||
}
|
||||
m.state.InFlight = slices.Delete(m.state.InFlight, index, index+1)
|
||||
if len(m.state.InFlight) == 0 {
|
||||
m.state.InFlight = nil
|
||||
}
|
||||
}
|
||||
|
||||
// ShutDown implements [TypedInterface].
|
||||
func (m *Mock[T]) ShutDown() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.state.ShutDownCalled++
|
||||
}
|
||||
|
||||
// ShutDownWithDrain implements [TypedInterface].
|
||||
func (m *Mock[T]) ShutDownWithDrain() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.state.ShutDownWithDrainCalled++
|
||||
}
|
||||
|
||||
// ShuttingDown implements [TypedInterface].
|
||||
func (m *Mock[T]) ShuttingDown() bool {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
return m.state.ShutDownCalled > 0 || m.state.ShutDownWithDrainCalled > 0
|
||||
}
|
||||
|
||||
// AddAfter implements [TypedDelayingInterface.AddAfter]
|
||||
func (m *Mock[T]) AddAfter(item T, duration time.Duration) {
|
||||
if duration == 0 {
|
||||
m.Add(item)
|
||||
return
|
||||
}
|
||||
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
for i := range m.state.Later {
|
||||
if m.state.Later[i].Item == item {
|
||||
// https://github.com/kubernetes/client-go/blob/270e5ab1714527c455865953da8ceba2810dbb50/util/workqueue/delaying_queue.go#L340-L349
|
||||
// only shortens the delay for an existing item. It does not make it longer.
|
||||
if m.state.Later[i].Duration > duration {
|
||||
m.state.Later[i].Duration = duration
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
m.state.Later = append(m.state.Later, MockDelayedItem[T]{Item: item, Duration: duration})
|
||||
}
|
||||
|
||||
// AddRateLimited implements [TypedRateLimitingInterface.AddRateLimited].
|
||||
func (m *Mock[T]) AddRateLimited(item T) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if m.state.Failures == nil {
|
||||
m.state.Failures = make(map[T]int)
|
||||
}
|
||||
m.state.Failures[item]++
|
||||
}
|
||||
|
||||
// Forget implements [TypedRateLimitingInterface.Forget].
|
||||
func (m *Mock[T]) Forget(item T) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
if m.state.Failures == nil {
|
||||
return
|
||||
}
|
||||
delete(m.state.Failures, item)
|
||||
}
|
||||
|
||||
// NumRequeues implements [TypedRateLimitingInterface.NumRequeues].
|
||||
func (m *Mock[T]) NumRequeues(item T) int {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
return m.state.Failures[item]
|
||||
}
|
@@ -393,7 +393,9 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
|
||||
driverResources := &resourceslice.DriverResources{
|
||||
Pools: map[string]resourceslice.Pool{
|
||||
d.nodeName: {
|
||||
Devices: resources.Devices,
|
||||
Slices: []resourceslice.Slice{{
|
||||
Devices: resources.Devices,
|
||||
}},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -407,7 +409,13 @@ func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) e
|
||||
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
|
||||
controllerCtx = klog.NewContext(controllerCtx, controllerLogger)
|
||||
var err error
|
||||
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources); err != nil {
|
||||
if d.resourceSliceController, err = resourceslice.StartController(controllerCtx,
|
||||
resourceslice.Options{
|
||||
DriverName: d.driverName,
|
||||
KubeClient: d.kubeClient,
|
||||
Owner: &owner,
|
||||
Resources: driverResources,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("start ResourceSlice controller: %w", err)
|
||||
}
|
||||
return nil
|
||||
|
@@ -20,19 +20,21 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourceapi "k8s.io/api/resource/v1alpha3"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
resourceinformers "k8s.io/client-go/informers/resource/v1alpha3"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
@@ -49,6 +51,20 @@ const (
|
||||
// poolNameIndex is the name for the ResourceSlice store's index function,
|
||||
// which is to index by ResourceSlice.Spec.Pool.Name
|
||||
poolNameIndex = "poolName"
|
||||
|
||||
// Including adds in the mutation cache is not safe: We could add a slice, store it,
|
||||
// and then the slice gets deleted without the informer hearing anything about that.
|
||||
// Then the obsolete slice remains in the mutation cache.
|
||||
//
|
||||
// To mitigate this, we use a TTL and check a pool again once added slices expire.
|
||||
defaultMutationCacheTTL = time.Minute
|
||||
|
||||
// defaultSyncDelay defines how long to wait between receiving the most recent
|
||||
// informer event and syncing again. This is long enough that the informer cache
|
||||
// should be up-to-date (matter mostly for deletes because an out-dated cache
|
||||
// causes redundant delete API calls) and not too long that a human mistake
|
||||
// doesn't get fixed while that human is waiting for it.
|
||||
defaultSyncDelay = 30 * time.Second
|
||||
)
|
||||
|
||||
// Controller synchronizes information about resources of one driver with
|
||||
@@ -57,13 +73,20 @@ const (
|
||||
// controller as part of its kubelet plugin.
|
||||
type Controller struct {
|
||||
cancel func(cause error)
|
||||
driver string
|
||||
owner Owner
|
||||
driverName string
|
||||
owner *Owner
|
||||
kubeClient kubernetes.Interface
|
||||
wg sync.WaitGroup
|
||||
// The queue is keyed with the pool name that needs work.
|
||||
queue workqueue.TypedRateLimitingInterface[string]
|
||||
sliceStore cache.Indexer
|
||||
queue workqueue.TypedRateLimitingInterface[string]
|
||||
sliceStore cache.MutationCache
|
||||
mutationCacheTTL time.Duration
|
||||
syncDelay time.Duration
|
||||
|
||||
// Must use atomic access...
|
||||
numCreates int64
|
||||
numUpdates int64
|
||||
numDeletes int64
|
||||
|
||||
mutex sync.RWMutex
|
||||
|
||||
@@ -94,7 +117,28 @@ type Pool struct {
|
||||
// by the controller.
|
||||
Generation int64
|
||||
|
||||
// Device names must be unique inside the pool.
|
||||
// Slices is a list of all ResourceSlices that the driver
|
||||
// wants to publish for this pool. The driver must ensure
|
||||
// that each resulting slice is valid. See the API
|
||||
// definition for details, in particular the limit on
|
||||
// the number of devices.
|
||||
//
|
||||
// If slices are not valid, then the controller will
|
||||
// log errors produced by the apiserver.
|
||||
//
|
||||
// Drivers should publish at least one slice for each
|
||||
// pool that they normally manage, even if that slice
|
||||
// is empty. "Empty pool" is different from "no pool"
|
||||
// because it shows that the driver is up-and-running
|
||||
// and simply doesn't have any devices.
|
||||
Slices []Slice
|
||||
}
|
||||
|
||||
// +k8s:deepcopy-gen=true
|
||||
|
||||
// Slice is turned into one ResourceSlice by the controller.
|
||||
type Slice struct {
|
||||
// Devices lists all devices which are part of the slice.
|
||||
Devices []resourceapi.Device
|
||||
}
|
||||
|
||||
@@ -110,19 +154,9 @@ type Owner struct {
|
||||
}
|
||||
|
||||
// StartController constructs a new controller and starts it.
|
||||
// If the owner is a v1.Node, then the NodeName field in the
|
||||
// ResourceSlice objects is set and used to identify objects
|
||||
// managed by the controller. The UID is not needed in that
|
||||
// case, the controller will determine it automatically.
|
||||
//
|
||||
// If a kubeClient is provided, then it synchronizes ResourceSlices
|
||||
// with the resource information provided by plugins. Without it,
|
||||
// the controller is inactive. This can happen when kubelet is run stand-alone
|
||||
// without an apiserver. In that case we can't and don't need to publish
|
||||
// ResourceSlices.
|
||||
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) {
|
||||
func StartController(ctx context.Context, options Options) (*Controller, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
c, err := newController(ctx, kubeClient, driver, owner, resources)
|
||||
c, err := newController(ctx, options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create controller: %w", err)
|
||||
}
|
||||
@@ -134,15 +168,49 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive
|
||||
defer logger.V(3).Info("Stopping")
|
||||
c.run(ctx)
|
||||
}()
|
||||
|
||||
// Sync each pool once.
|
||||
for poolName := range resources.Pools {
|
||||
c.queue.Add(poolName)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Options contains various optional settings for [StartController].
|
||||
type Options struct {
|
||||
// DriverName is the required name of the DRA driver.
|
||||
DriverName string
|
||||
|
||||
// KubeClient is used to read Node objects (if necessary) and to access
|
||||
// ResourceSlices. It must be specified.
|
||||
KubeClient kubernetes.Interface
|
||||
|
||||
// If the owner is a v1.Node, then the NodeName field in the
|
||||
// ResourceSlice objects is set and used to identify objects
|
||||
// managed by the controller. The UID is not needed in that
|
||||
// case, the controller will determine it automatically.
|
||||
//
|
||||
// The owner must be cluster-scoped. This is not always possible,
|
||||
// therefore it is optional. A driver without a owner must take
|
||||
// care that remaining slices get deleted manually as part of
|
||||
// a driver uninstall because garbage collection won't work.
|
||||
Owner *Owner
|
||||
|
||||
// This is the initial desired set of slices.
|
||||
Resources *DriverResources
|
||||
|
||||
// Queue can be used to override the default work queue implementation.
|
||||
Queue workqueue.TypedRateLimitingInterface[string]
|
||||
|
||||
// MutationCacheTTL can be used to change the default TTL of one minute.
|
||||
// See source code for details.
|
||||
MutationCacheTTL *time.Duration
|
||||
|
||||
// SyncDelay defines how long to wait between receiving the most recent
|
||||
// informer event and syncing again. The default is 30 seconds.
|
||||
//
|
||||
// This is long enough that the informer cache should be up-to-date
|
||||
// (matter mostly for deletes because an out-dated cache causes
|
||||
// redundant delete API calls) and not too long that a human mistake
|
||||
// doesn't get fixed while that human is waiting for it.
|
||||
SyncDelay *time.Duration
|
||||
}
|
||||
|
||||
// Stop cancels all background activity and blocks until the controller has stopped.
|
||||
func (c *Controller) Stop() {
|
||||
if c == nil {
|
||||
@@ -154,8 +222,8 @@ func (c *Controller) Stop() {
|
||||
|
||||
// Update sets the new desired state of the resource information.
|
||||
//
|
||||
// The controller takes over ownership, so these resources must
|
||||
// not get modified after this method returns.
|
||||
// The controller is doing a deep copy, so the caller may update
|
||||
// the instance once Update returns.
|
||||
func (c *Controller) Update(resources *DriverResources) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
@@ -165,7 +233,7 @@ func (c *Controller) Update(resources *DriverResources) {
|
||||
c.queue.Add(poolName)
|
||||
}
|
||||
|
||||
c.resources = resources
|
||||
c.resources = resources.DeepCopy()
|
||||
|
||||
// ... and the new ones (might be the same).
|
||||
for poolName := range c.resources.Pools {
|
||||
@@ -173,29 +241,64 @@ func (c *Controller) Update(resources *DriverResources) {
|
||||
}
|
||||
}
|
||||
|
||||
// GetStats provides some insights into operations of the controller.
|
||||
func (c *Controller) GetStats() Stats {
|
||||
s := Stats{
|
||||
NumCreates: atomic.LoadInt64(&c.numCreates),
|
||||
NumUpdates: atomic.LoadInt64(&c.numUpdates),
|
||||
NumDeletes: atomic.LoadInt64(&c.numDeletes),
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
// NumCreates counts the number of ResourceSlices that got created.
|
||||
NumCreates int64
|
||||
// NumUpdates counts the number of ResourceSlices that got update.
|
||||
NumUpdates int64
|
||||
// NumDeletes counts the number of ResourceSlices that got deleted.
|
||||
NumDeletes int64
|
||||
}
|
||||
|
||||
// newController creates a new controller.
|
||||
func newController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) (*Controller, error) {
|
||||
if kubeClient == nil {
|
||||
return nil, fmt.Errorf("kubeClient is nil")
|
||||
func newController(ctx context.Context, options Options) (*Controller, error) {
|
||||
if options.KubeClient == nil {
|
||||
return nil, errors.New("KubeClient is nil")
|
||||
}
|
||||
if options.DriverName == "" {
|
||||
return nil, errors.New("DRA driver name is empty")
|
||||
}
|
||||
if options.Resources == nil {
|
||||
return nil, errors.New("DriverResources are nil")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
|
||||
c := &Controller{
|
||||
cancel: cancel,
|
||||
kubeClient: kubeClient,
|
||||
driver: driver,
|
||||
owner: owner,
|
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
cancel: cancel,
|
||||
kubeClient: options.KubeClient,
|
||||
driverName: options.DriverName,
|
||||
owner: options.Owner.DeepCopy(),
|
||||
queue: options.Queue,
|
||||
resources: options.Resources.DeepCopy(),
|
||||
mutationCacheTTL: ptr.Deref(options.MutationCacheTTL, defaultMutationCacheTTL),
|
||||
syncDelay: ptr.Deref(options.SyncDelay, defaultSyncDelay),
|
||||
}
|
||||
if c.queue == nil {
|
||||
c.queue = workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[string](),
|
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
|
||||
),
|
||||
resources: resources,
|
||||
)
|
||||
}
|
||||
|
||||
if err := c.initInformer(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Sync each desired pool once.
|
||||
for poolName := range options.Resources.Pools {
|
||||
c.queue.Add(poolName)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
@@ -205,10 +308,10 @@ func (c *Controller) initInformer(ctx context.Context) error {
|
||||
|
||||
// We always filter by driver name, by node name only for node-local resources.
|
||||
selector := fields.Set{
|
||||
resourceapi.ResourceSliceSelectorDriver: c.driver,
|
||||
resourceapi.ResourceSliceSelectorDriver: c.driverName,
|
||||
resourceapi.ResourceSliceSelectorNodeName: "",
|
||||
}
|
||||
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
|
||||
if c.owner != nil && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
|
||||
selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name
|
||||
}
|
||||
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, cache.Indexers{
|
||||
@@ -222,7 +325,7 @@ func (c *Controller) initInformer(ctx context.Context) error {
|
||||
}, func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = selector.String()
|
||||
})
|
||||
c.sliceStore = informer.GetIndexer()
|
||||
c.sliceStore = cache.NewIntegerResourceVersionMutationCache(informer.GetStore(), informer.GetIndexer(), c.mutationCacheTTL, true /* includeAdds */)
|
||||
handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
slice, ok := obj.(*resourceapi.ResourceSlice)
|
||||
@@ -230,7 +333,7 @@ func (c *Controller) initInformer(ctx context.Context) error {
|
||||
return
|
||||
}
|
||||
logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice))
|
||||
c.queue.Add(slice.Spec.Pool.Name)
|
||||
c.queue.AddAfter(slice.Spec.Pool.Name, c.syncDelay)
|
||||
},
|
||||
UpdateFunc: func(old, new any) {
|
||||
oldSlice, ok := old.(*resourceapi.ResourceSlice)
|
||||
@@ -246,8 +349,8 @@ func (c *Controller) initInformer(ctx context.Context) error {
|
||||
} else {
|
||||
logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice))
|
||||
}
|
||||
c.queue.Add(oldSlice.Spec.Pool.Name)
|
||||
c.queue.Add(newSlice.Spec.Pool.Name)
|
||||
c.queue.AddAfter(oldSlice.Spec.Pool.Name, c.syncDelay)
|
||||
c.queue.AddAfter(newSlice.Spec.Pool.Name, c.syncDelay)
|
||||
},
|
||||
DeleteFunc: func(obj any) {
|
||||
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
@@ -258,7 +361,7 @@ func (c *Controller) initInformer(ctx context.Context) error {
|
||||
return
|
||||
}
|
||||
logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice))
|
||||
c.queue.Add(slice.Spec.Pool.Name)
|
||||
c.queue.AddAfter(slice.Spec.Pool.Name, c.syncDelay)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@@ -348,7 +451,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
|
||||
// The result gets cached and is expected to not change while
|
||||
// the controller runs.
|
||||
var nodeName string
|
||||
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
|
||||
if c.owner != nil && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
|
||||
nodeName = c.owner.Name
|
||||
if c.owner.UID == "" {
|
||||
node, err := c.kubeClient.CoreV1().Nodes().Get(ctx, c.owner.Name, metav1.GetOptions{})
|
||||
@@ -360,8 +463,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Slices that don't match any driver resource can either be updated (if there
|
||||
// are new driver resources that need to be stored) or they need to be deleted.
|
||||
// Slices that don't match any driver slice need to be deleted.
|
||||
obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices))
|
||||
|
||||
// Determine highest generation.
|
||||
@@ -381,92 +483,233 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
|
||||
currentSlices = append(currentSlices, slice)
|
||||
}
|
||||
}
|
||||
slices = currentSlices
|
||||
|
||||
// Sort by name to ensure that keeping only the first slice is deterministic.
|
||||
sort.Slice(slices, func(i, j int) bool {
|
||||
return slices[i].Name < slices[j].Name
|
||||
})
|
||||
logger.V(5).Info("Existing slices", "obsolete", klog.KObjSlice(obsoleteSlices), "current", klog.KObjSlice(currentSlices))
|
||||
|
||||
if pool, ok := resources.Pools[poolName]; ok {
|
||||
if pool.Generation > generation {
|
||||
generation = pool.Generation
|
||||
}
|
||||
|
||||
// Right now all devices get published in a single slice.
|
||||
// We simply pick the first one, if there is one, and copy
|
||||
// it in preparation for updating it.
|
||||
// Match each existing slice against the desired slices.
|
||||
// Two slices match if they contain exactly the same
|
||||
// device IDs, in an arbitrary order. Such a matched
|
||||
// slice gets updated with the desired content if
|
||||
// there is a difference.
|
||||
//
|
||||
// TODO: support splitting across slices, with unit tests.
|
||||
if len(slices) > 0 {
|
||||
obsoleteSlices = append(obsoleteSlices, slices[1:]...)
|
||||
slices = []*resourceapi.ResourceSlice{slices[0].DeepCopy()}
|
||||
} else {
|
||||
slices = []*resourceapi.ResourceSlice{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
GenerateName: c.owner.Name + "-" + c.driver + "-",
|
||||
},
|
||||
},
|
||||
// This supports updating the definition of devices
|
||||
// in a slice. Adding or removing devices is done
|
||||
// by deleting the old slice and creating a new one.
|
||||
//
|
||||
// This is primarily a simplification of the code:
|
||||
// to support adding or removing devices from
|
||||
// existing slices, we would have to identify "most
|
||||
// similar" slices (= minimal editing distance).
|
||||
//
|
||||
// In currentSliceForDesiredSlice we keep track of
|
||||
// which desired slice has a matched slice.
|
||||
//
|
||||
// At the end of the loop, each current slice is either
|
||||
// a match or obsolete.
|
||||
currentSliceForDesiredSlice := make(map[int]*resourceapi.ResourceSlice, len(pool.Slices))
|
||||
for _, currentSlice := range currentSlices {
|
||||
matched := false
|
||||
for i := range pool.Slices {
|
||||
if _, ok := currentSliceForDesiredSlice[i]; ok {
|
||||
// Already has a match.
|
||||
continue
|
||||
}
|
||||
if sameSlice(currentSlice, &pool.Slices[i]) {
|
||||
currentSliceForDesiredSlice[i] = currentSlice
|
||||
logger.V(5).Info("Matched existing slice", "slice", klog.KObj(currentSlice), "matchIndex", i)
|
||||
matched = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !matched {
|
||||
obsoleteSlices = append(obsoleteSlices, currentSlice)
|
||||
logger.V(5).Info("Unmatched existing slice", "slice", klog.KObj(currentSlice))
|
||||
}
|
||||
}
|
||||
|
||||
slice := slices[0]
|
||||
slice.OwnerReferences = []metav1.OwnerReference{{
|
||||
APIVersion: c.owner.APIVersion,
|
||||
Kind: c.owner.Kind,
|
||||
Name: c.owner.Name,
|
||||
UID: c.owner.UID,
|
||||
Controller: ptr.To(true),
|
||||
}}
|
||||
slice.Spec.Driver = c.driver
|
||||
slice.Spec.Pool.Name = poolName
|
||||
slice.Spec.Pool.Generation = generation
|
||||
slice.Spec.Pool.ResourceSliceCount = 1
|
||||
slice.Spec.NodeName = nodeName
|
||||
slice.Spec.NodeSelector = pool.NodeSelector
|
||||
slice.Spec.AllNodes = pool.NodeSelector == nil && nodeName == ""
|
||||
slice.Spec.Devices = pool.Devices
|
||||
// Desired metadata which must be set in each slice.
|
||||
resourceSliceCount := len(pool.Slices)
|
||||
numMatchedSlices := len(currentSliceForDesiredSlice)
|
||||
numNewSlices := resourceSliceCount - numMatchedSlices
|
||||
desiredPool := resourceapi.ResourcePool{
|
||||
Name: poolName,
|
||||
Generation: generation, // May get updated later.
|
||||
ResourceSliceCount: int64(resourceSliceCount),
|
||||
}
|
||||
desiredAllNodes := pool.NodeSelector == nil && nodeName == ""
|
||||
|
||||
if loggerV := logger.V(6); loggerV.Enabled() {
|
||||
// Dump entire resource information.
|
||||
loggerV.Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "pool", pool)
|
||||
} else {
|
||||
logger.V(5).Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices))
|
||||
// Now for each desired slice, figure out which of them are changed.
|
||||
changedDesiredSlices := sets.New[int]()
|
||||
for i, currentSlice := range currentSliceForDesiredSlice {
|
||||
// Reordering entries is a difference and causes an update even if the
|
||||
// entries are the same.
|
||||
if !apiequality.Semantic.DeepEqual(¤tSlice.Spec.Pool, &desiredPool) ||
|
||||
!apiequality.Semantic.DeepEqual(currentSlice.Spec.NodeSelector, pool.NodeSelector) ||
|
||||
currentSlice.Spec.AllNodes != desiredAllNodes ||
|
||||
!apiequality.Semantic.DeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) {
|
||||
changedDesiredSlices.Insert(i)
|
||||
logger.V(5).Info("Need to update slice", "slice", klog.KObj(currentSlice), "matchIndex", i)
|
||||
}
|
||||
}
|
||||
logger.V(5).Info("Completed comparison",
|
||||
"numObsolete", len(obsoleteSlices),
|
||||
"numMatchedSlices", len(currentSliceForDesiredSlice),
|
||||
"numChangedMatchedSlices", len(changedDesiredSlices),
|
||||
"numNewSlices", numNewSlices,
|
||||
)
|
||||
|
||||
bumpedGeneration := false
|
||||
switch {
|
||||
case pool.Generation > generation:
|
||||
// Bump up the generation if the driver asked for it, or
|
||||
// start with a non-zero generation.
|
||||
generation = pool.Generation
|
||||
bumpedGeneration = true
|
||||
logger.V(5).Info("Bumped generation to driver-provided generation", "generation", generation)
|
||||
case numNewSlices == 0 && len(changedDesiredSlices) <= 1:
|
||||
logger.V(5).Info("Kept generation because at most one update API call is necessary", "generation", generation)
|
||||
default:
|
||||
generation++
|
||||
bumpedGeneration = true
|
||||
logger.V(5).Info("Bumped generation by one", "generation", generation)
|
||||
}
|
||||
desiredPool.Generation = generation
|
||||
|
||||
// Update existing slices.
|
||||
for i, currentSlice := range currentSliceForDesiredSlice {
|
||||
if !changedDesiredSlices.Has(i) && !bumpedGeneration {
|
||||
continue
|
||||
}
|
||||
slice := currentSlice.DeepCopy()
|
||||
slice.Spec.Pool = desiredPool
|
||||
// No need to set the node name. If it was different, we wouldn't
|
||||
// have listed the existing slice.
|
||||
slice.Spec.NodeSelector = pool.NodeSelector
|
||||
slice.Spec.AllNodes = desiredAllNodes
|
||||
slice.Spec.Devices = pool.Slices[i].Devices
|
||||
|
||||
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
|
||||
slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("update resource slice: %w", err)
|
||||
}
|
||||
atomic.AddInt64(&c.numUpdates, 1)
|
||||
c.sliceStore.Mutation(slice)
|
||||
}
|
||||
|
||||
// Create new slices.
|
||||
added := false
|
||||
for i := 0; i < len(pool.Slices); i++ {
|
||||
if _, ok := currentSliceForDesiredSlice[i]; ok {
|
||||
// Was handled above through an update.
|
||||
continue
|
||||
}
|
||||
var ownerReferences []metav1.OwnerReference
|
||||
if c.owner != nil {
|
||||
ownerReferences = append(ownerReferences,
|
||||
metav1.OwnerReference{
|
||||
APIVersion: c.owner.APIVersion,
|
||||
Kind: c.owner.Kind,
|
||||
Name: c.owner.Name,
|
||||
UID: c.owner.UID,
|
||||
Controller: ptr.To(true),
|
||||
},
|
||||
)
|
||||
}
|
||||
generateName := c.driverName + "-"
|
||||
if c.owner != nil {
|
||||
generateName = c.owner.Name + "-" + generateName
|
||||
}
|
||||
slice := &resourceapi.ResourceSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
OwnerReferences: ownerReferences,
|
||||
GenerateName: generateName,
|
||||
},
|
||||
Spec: resourceapi.ResourceSliceSpec{
|
||||
Driver: c.driverName,
|
||||
Pool: desiredPool,
|
||||
NodeName: nodeName,
|
||||
NodeSelector: pool.NodeSelector,
|
||||
AllNodes: desiredAllNodes,
|
||||
Devices: pool.Slices[i].Devices,
|
||||
},
|
||||
}
|
||||
|
||||
// It can happen that we create a missing slice, some
|
||||
// other change than the create causes another sync of
|
||||
// the pool, and then a second slice for the same set
|
||||
// of devices would get created because the controller has
|
||||
// no copy of the first slice instance in its informer
|
||||
// cache yet.
|
||||
//
|
||||
// Using a https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache
|
||||
// avoids that.
|
||||
logger.V(5).Info("Creating new resource slice")
|
||||
slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("create resource slice: %w", err)
|
||||
}
|
||||
atomic.AddInt64(&c.numCreates, 1)
|
||||
c.sliceStore.Mutation(slice)
|
||||
added = true
|
||||
}
|
||||
if added {
|
||||
// Check that the recently added slice(s) really exist even
|
||||
// after they expired from the mutation cache.
|
||||
c.queue.AddAfter(poolName, c.mutationCacheTTL)
|
||||
}
|
||||
} else if len(slices) > 0 {
|
||||
// All are obsolete, pool does not exist anymore.
|
||||
|
||||
logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices))
|
||||
obsoleteSlices = append(obsoleteSlices, slices...)
|
||||
// No need to create or update the slices.
|
||||
slices = nil
|
||||
obsoleteSlices = slices
|
||||
logger.V(5).Info("Removing resource slices after pool removal")
|
||||
}
|
||||
|
||||
// Remove stale slices.
|
||||
for _, slice := range obsoleteSlices {
|
||||
logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice))
|
||||
if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
|
||||
options := metav1.DeleteOptions{
|
||||
Preconditions: &metav1.Preconditions{
|
||||
UID: &slice.UID,
|
||||
ResourceVersion: &slice.ResourceVersion,
|
||||
},
|
||||
}
|
||||
// It can happen that we sync again shortly after deleting a
|
||||
// slice and before the slice gets removed from the informer
|
||||
// cache. The MutationCache can't help here because it does not
|
||||
// track pending deletes.
|
||||
//
|
||||
// If this happens, we get a "not found error" and nothing
|
||||
// changes on the server. The only downside is the extra API
|
||||
// call. This isn't as bad as extra creates.
|
||||
logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice), "deleteOptions", options)
|
||||
err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, options)
|
||||
switch {
|
||||
case err == nil:
|
||||
atomic.AddInt64(&c.numDeletes, 1)
|
||||
case apierrors.IsNotFound(err):
|
||||
logger.V(5).Info("Resource slice was already deleted earlier", "slice", klog.KObj(slice))
|
||||
default:
|
||||
return fmt.Errorf("delete resource slice: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create or update slices.
|
||||
for _, slice := range slices {
|
||||
if slice.UID == "" {
|
||||
logger.V(5).Info("Creating new resource slice", "slice", klog.KObj(slice))
|
||||
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
|
||||
return fmt.Errorf("create resource slice: %w", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: switch to SSA once unit testing supports it.
|
||||
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
|
||||
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
|
||||
return fmt.Errorf("update resource slice: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func sameSlice(existingSlice *resourceapi.ResourceSlice, desiredSlice *Slice) bool {
|
||||
if len(existingSlice.Spec.Devices) != len(desiredSlice.Devices) {
|
||||
return false
|
||||
}
|
||||
|
||||
existingDevices := sets.New[string]()
|
||||
for _, device := range existingSlice.Spec.Devices {
|
||||
existingDevices.Insert(device.Name)
|
||||
}
|
||||
for _, device := range desiredSlice.Devices {
|
||||
if !existingDevices.Has(device.Name) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Same number of devices, names all present -> equal.
|
||||
return true
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@@ -73,9 +73,9 @@ func (in *Pool) DeepCopyInto(out *Pool) {
|
||||
*out = new(v1.NodeSelector)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.Devices != nil {
|
||||
in, out := &in.Devices, &out.Devices
|
||||
*out = make([]v1alpha3.Device, len(*in))
|
||||
if in.Slices != nil {
|
||||
in, out := &in.Slices, &out.Slices
|
||||
*out = make([]Slice, len(*in))
|
||||
for i := range *in {
|
||||
(*in)[i].DeepCopyInto(&(*out)[i])
|
||||
}
|
||||
@@ -92,3 +92,26 @@ func (in *Pool) DeepCopy() *Pool {
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *Slice) DeepCopyInto(out *Slice) {
|
||||
*out = *in
|
||||
if in.Devices != nil {
|
||||
in, out := &in.Devices, &out.Devices
|
||||
*out = make([]v1alpha3.Device, len(*in))
|
||||
for i := range *in {
|
||||
(*in)[i].DeepCopyInto(&(*out)[i])
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Slice.
|
||||
func (in *Slice) DeepCopy() *Slice {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(Slice)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
@@ -198,6 +198,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
|
||||
if pool.IsIncomplete {
|
||||
return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently being updated", klog.KObj(claim), request.Name, pool.PoolID)
|
||||
}
|
||||
if pool.IsInvalid {
|
||||
return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently invalid", klog.KObj(claim), request.Name, pool.PoolID)
|
||||
}
|
||||
|
||||
for _, slice := range pool.Slices {
|
||||
for deviceIndex := range slice.Spec.Devices {
|
||||
@@ -599,6 +602,13 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
// If the pool is not valid, then fail now. It's okay when pools of one driver
|
||||
// are invalid if we allocate from some other pool, but it's not safe to
|
||||
// allocated from an invalid pool.
|
||||
if pool.IsInvalid {
|
||||
return false, fmt.Errorf("pool %s is invalid: %s", pool.Pool, pool.InvalidReason)
|
||||
}
|
||||
|
||||
// Finally treat as allocated and move on to the next device.
|
||||
allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false)
|
||||
if err != nil {
|
||||
|
@@ -535,6 +535,28 @@ func TestAllocator(t *testing.T) {
|
||||
deviceAllocationResult(req0, driverA, pool1, device1, false),
|
||||
)},
|
||||
},
|
||||
"duplicate-slice": {
|
||||
claimsToAllocate: objects(claim(claim0, req0, classA)),
|
||||
classes: objects(class(classA, driverA)),
|
||||
slices: func() []*resourceapi.ResourceSlice {
|
||||
// This simulates the problem that can
|
||||
// (theoretically) occur when the resource
|
||||
// slice controller wants to publish a pool
|
||||
// with two slices but ends up creating some
|
||||
// identical slices under different names
|
||||
// because its informer cache was out-dated on
|
||||
// another sync (see
|
||||
// resourceslicecontroller.go).
|
||||
sliceA := sliceWithOneDevice(slice1, node1, pool1, driverA)
|
||||
sliceA.Spec.Pool.ResourceSliceCount = 2
|
||||
sliceB := sliceA.DeepCopy()
|
||||
sliceB.Name += "-2"
|
||||
return []*resourceapi.ResourceSlice{sliceA, sliceB}
|
||||
}(),
|
||||
node: node(node1, region1),
|
||||
|
||||
expectError: gomega.MatchError(gomega.ContainSubstring(fmt.Sprintf("pool %s is invalid: duplicate device name %s", pool1, device1))),
|
||||
},
|
||||
"no-slices": {
|
||||
claimsToAllocate: objects(claim(claim0, req0, classA)),
|
||||
classes: objects(class(classA, driverA)),
|
||||
|
@@ -23,6 +23,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourceapi "k8s.io/api/resource/v1alpha3"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
|
||||
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
|
||||
)
|
||||
@@ -30,8 +31,9 @@ import (
|
||||
// GatherPools collects information about all resource pools which provide
|
||||
// devices that are accessible from the given node.
|
||||
//
|
||||
// Out-dated slices are silently ignored. Pools may be incomplete, which is
|
||||
// recorded in the result.
|
||||
// Out-dated slices are silently ignored. Pools may be incomplete (not all
|
||||
// required slices available) or invalid (for example, device names not unique).
|
||||
// Both is recorded in the result.
|
||||
func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) {
|
||||
pools := make(map[PoolID]*Pool)
|
||||
|
||||
@@ -75,6 +77,7 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL
|
||||
result := make([]*Pool, 0, len(pools))
|
||||
for _, pool := range pools {
|
||||
pool.IsIncomplete = int64(len(pool.Slices)) != pool.Slices[0].Spec.Pool.ResourceSliceCount
|
||||
pool.IsInvalid, pool.InvalidReason = poolIsInvalid(pool)
|
||||
result = append(result, pool)
|
||||
}
|
||||
|
||||
@@ -101,17 +104,32 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) {
|
||||
|
||||
if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation {
|
||||
// Newer, replaces all old slices.
|
||||
pool.Slices = []*resourceapi.ResourceSlice{slice}
|
||||
pool.Slices = nil
|
||||
}
|
||||
|
||||
// Add to pool.
|
||||
pool.Slices = append(pool.Slices, slice)
|
||||
}
|
||||
|
||||
func poolIsInvalid(pool *Pool) (bool, string) {
|
||||
devices := sets.New[string]()
|
||||
for _, slice := range pool.Slices {
|
||||
for _, device := range slice.Spec.Devices {
|
||||
if devices.Has(device.Name) {
|
||||
return true, fmt.Sprintf("duplicate device name %s", device.Name)
|
||||
}
|
||||
devices.Insert(device.Name)
|
||||
}
|
||||
}
|
||||
return false, ""
|
||||
}
|
||||
|
||||
type Pool struct {
|
||||
PoolID
|
||||
IsIncomplete bool
|
||||
Slices []*resourceapi.ResourceSlice
|
||||
IsIncomplete bool
|
||||
IsInvalid bool
|
||||
InvalidReason string
|
||||
Slices []*resourceapi.ResourceSlice
|
||||
}
|
||||
|
||||
type PoolID struct {
|
||||
|
@@ -40,8 +40,10 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/validation"
|
||||
applyv1 "k8s.io/client-go/applyconfigurations/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/dynamic-resource-allocation/resourceslice"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/test/e2e/feature"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
@@ -729,6 +731,97 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
// TODO (https://github.com/kubernetes/kubernetes/issues/123699): move most of the test below into `testDriver` so that they get
|
||||
// executed with different parameters.
|
||||
|
||||
ginkgo.Context("ResourceSlice Controller", func() {
|
||||
// This is a stress test for creating many large slices.
|
||||
// Each slice is as large as API limits allow.
|
||||
//
|
||||
// Could become a conformance test because it only depends
|
||||
// on the apiserver.
|
||||
f.It("creates slices", func(ctx context.Context) {
|
||||
// Define desired resource slices.
|
||||
driverName := f.Namespace.Name
|
||||
numSlices := 100
|
||||
devicePrefix := "dev-"
|
||||
domainSuffix := ".example.com"
|
||||
poolName := "network-attached"
|
||||
domain := strings.Repeat("x", 63 /* TODO(pohly): add to API */ -len(domainSuffix)) + domainSuffix
|
||||
stringValue := strings.Repeat("v", resourceapi.DeviceAttributeMaxValueLength)
|
||||
pool := resourceslice.Pool{
|
||||
Slices: make([]resourceslice.Slice, numSlices),
|
||||
}
|
||||
for i := 0; i < numSlices; i++ {
|
||||
devices := make([]resourceapi.Device, resourceapi.ResourceSliceMaxDevices)
|
||||
for e := 0; e < resourceapi.ResourceSliceMaxDevices; e++ {
|
||||
device := resourceapi.Device{
|
||||
Name: devicePrefix + strings.Repeat("x", validation.DNS1035LabelMaxLength-len(devicePrefix)-4) + fmt.Sprintf("%04d", e),
|
||||
Basic: &resourceapi.BasicDevice{
|
||||
Attributes: make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice),
|
||||
},
|
||||
}
|
||||
for j := 0; j < resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice; j++ {
|
||||
name := resourceapi.QualifiedName(domain + "/" + strings.Repeat("x", resourceapi.DeviceMaxIDLength-4) + fmt.Sprintf("%04d", j))
|
||||
device.Basic.Attributes[name] = resourceapi.DeviceAttribute{
|
||||
StringValue: &stringValue,
|
||||
}
|
||||
}
|
||||
devices[e] = device
|
||||
}
|
||||
pool.Slices[i].Devices = devices
|
||||
}
|
||||
resources := &resourceslice.DriverResources{
|
||||
Pools: map[string]resourceslice.Pool{poolName: pool},
|
||||
}
|
||||
|
||||
ginkgo.By("Creating slices")
|
||||
mutationCacheTTL := 10 * time.Second
|
||||
controller, err := resourceslice.StartController(ctx, resourceslice.Options{
|
||||
DriverName: driverName,
|
||||
KubeClient: f.ClientSet,
|
||||
Resources: resources,
|
||||
MutationCacheTTL: &mutationCacheTTL,
|
||||
})
|
||||
framework.ExpectNoError(err, "start controller")
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
controller.Stop()
|
||||
err := f.ClientSet.ResourceV1alpha3().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{
|
||||
FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName,
|
||||
})
|
||||
framework.ExpectNoError(err, "delete resource slices")
|
||||
})
|
||||
|
||||
// Eventually we should have all desired slices.
|
||||
listSlices := framework.ListObjects(f.ClientSet.ResourceV1alpha3().ResourceSlices().List, metav1.ListOptions{
|
||||
FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName,
|
||||
})
|
||||
gomega.Eventually(ctx, listSlices).WithTimeout(time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(numSlices)))
|
||||
|
||||
// Verify state.
|
||||
expectSlices, err := listSlices(ctx)
|
||||
framework.ExpectNoError(err)
|
||||
gomega.Expect(expectSlices.Items).ShouldNot(gomega.BeEmpty())
|
||||
framework.Logf("Protobuf size of one slice is %d bytes = %d KB.", expectSlices.Items[0].Size(), expectSlices.Items[0].Size()/1024)
|
||||
gomega.Expect(expectSlices.Items[0].Size()).Should(gomega.BeNumerically(">=", 600*1024), "ResourceSlice size")
|
||||
gomega.Expect(expectSlices.Items[0].Size()).Should(gomega.BeNumerically("<", 1024*1024), "ResourceSlice size")
|
||||
expectStats := resourceslice.Stats{NumCreates: int64(numSlices)}
|
||||
gomega.Expect(controller.GetStats()).Should(gomega.Equal(expectStats))
|
||||
|
||||
// No further changes expected now, after after checking again.
|
||||
gomega.Consistently(ctx, controller.GetStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats))
|
||||
|
||||
// Ask the controller to delete all slices except for one empty slice.
|
||||
ginkgo.By("Deleting slices")
|
||||
resources = resources.DeepCopy()
|
||||
resources.Pools[poolName] = resourceslice.Pool{Slices: []resourceslice.Slice{{}}}
|
||||
controller.Update(resources)
|
||||
|
||||
// One empty slice should remain, after removing the full ones and adding the empty one.
|
||||
emptySlice := gomega.HaveField("Spec.Devices", gomega.BeEmpty())
|
||||
gomega.Eventually(ctx, listSlices).WithTimeout(time.Minute).Should(gomega.HaveField("Items", gomega.ConsistOf(emptySlice)))
|
||||
expectStats = resourceslice.Stats{NumCreates: int64(numSlices) + 1, NumDeletes: int64(numSlices)}
|
||||
gomega.Consistently(ctx, controller.GetStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats))
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.Context("cluster", func() {
|
||||
nodes := NewNodes(f, 1, 1)
|
||||
driver := NewDriver(f, nodes, networkResources)
|
||||
|
Reference in New Issue
Block a user