mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
DRA resource slice controller: use MutationCache to avoid race
This avoids the problem of creating an additional slice when the one from the previous sync is not in the informer cache yet. It also avoids false attempts to delete slices which were updated in the previous sync. Such attempts would fail the ResourceVersion precondition check, but would still cause work for the apiserver.
This commit is contained in:
parent
e88d5c37e6
commit
7473e643fa
@ -51,6 +51,13 @@ const (
|
||||
// poolNameIndex is the name for the ResourceSlice store's index function,
|
||||
// which is to index by ResourceSlice.Spec.Pool.Name
|
||||
poolNameIndex = "poolName"
|
||||
|
||||
// Including adds in the mutation cache is not safe: We could add a slice, store it,
|
||||
// and then the slice gets deleted without the informer hearing anything about that.
|
||||
// Then the obsolete slice remains in the mutation cache.
|
||||
//
|
||||
// To mitigate this, we use a TTL and check a pool again once added slices expire.
|
||||
defaultMutationCacheTTL = time.Minute
|
||||
)
|
||||
|
||||
// Controller synchronizes information about resources of one driver with
|
||||
@ -64,8 +71,9 @@ type Controller struct {
|
||||
kubeClient kubernetes.Interface
|
||||
wg sync.WaitGroup
|
||||
// The queue is keyed with the pool name that needs work.
|
||||
queue workqueue.TypedRateLimitingInterface[string]
|
||||
sliceStore cache.Indexer
|
||||
queue workqueue.TypedRateLimitingInterface[string]
|
||||
sliceStore cache.MutationCache
|
||||
mutationCacheTTL time.Duration
|
||||
|
||||
// Must use atomic access...
|
||||
numCreates int64
|
||||
@ -182,6 +190,10 @@ type Options struct {
|
||||
|
||||
// Queue can be used to override the default work queue implementation.
|
||||
Queue workqueue.TypedRateLimitingInterface[string]
|
||||
|
||||
// MutationCacheTTL can be used to change the default TTL of one minute.
|
||||
// See source code for details.
|
||||
MutationCacheTTL *time.Duration
|
||||
}
|
||||
|
||||
// Stop cancels all background activity and blocks until the controller has stopped.
|
||||
@ -249,12 +261,13 @@ func newController(ctx context.Context, options Options) (*Controller, error) {
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
|
||||
c := &Controller{
|
||||
cancel: cancel,
|
||||
kubeClient: options.KubeClient,
|
||||
driverName: options.DriverName,
|
||||
owner: options.Owner.DeepCopy(),
|
||||
queue: options.Queue,
|
||||
resources: options.Resources,
|
||||
cancel: cancel,
|
||||
kubeClient: options.KubeClient,
|
||||
driverName: options.DriverName,
|
||||
owner: options.Owner.DeepCopy(),
|
||||
queue: options.Queue,
|
||||
resources: options.Resources,
|
||||
mutationCacheTTL: defaultMutationCacheTTL,
|
||||
}
|
||||
if c.queue == nil {
|
||||
c.queue = workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
@ -262,7 +275,9 @@ func newController(ctx context.Context, options Options) (*Controller, error) {
|
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: "node_resource_slices"},
|
||||
)
|
||||
}
|
||||
|
||||
if options.MutationCacheTTL != nil {
|
||||
c.mutationCacheTTL = *options.MutationCacheTTL
|
||||
}
|
||||
if err := c.initInformer(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -298,7 +313,7 @@ func (c *Controller) initInformer(ctx context.Context) error {
|
||||
}, func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = selector.String()
|
||||
})
|
||||
c.sliceStore = informer.GetIndexer()
|
||||
c.sliceStore = cache.NewIntegerResourceVersionMutationCache(informer.GetStore(), informer.GetIndexer(), c.mutationCacheTTL, true /* includeAdds */)
|
||||
handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
slice, ok := obj.(*resourceapi.ResourceSlice)
|
||||
@ -562,13 +577,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
|
||||
slice.Spec.Devices = pool.Slices[i].Devices
|
||||
|
||||
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
|
||||
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
|
||||
slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("update resource slice: %w", err)
|
||||
}
|
||||
atomic.AddInt64(&c.numUpdates, 1)
|
||||
c.sliceStore.Mutation(slice)
|
||||
}
|
||||
|
||||
// Create new slices.
|
||||
added := false
|
||||
for i := 0; i < len(pool.Slices); i++ {
|
||||
if _, ok := currentSliceForDesiredSlice[i]; ok {
|
||||
// Was handled above through an update.
|
||||
@ -608,21 +626,25 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
|
||||
// It can happen that we create a missing slice, some
|
||||
// other change than the create causes another sync of
|
||||
// the pool, and then a second slice for the same set
|
||||
// of devices gets created because the controller has
|
||||
// of devices would get created because the controller has
|
||||
// no copy of the first slice instance in its informer
|
||||
// cache yet.
|
||||
//
|
||||
// This is not a problem: a client will either see one
|
||||
// of the two slices and use it or see both and do
|
||||
// nothing because of the duplicated device IDs.
|
||||
//
|
||||
// To avoid creating a second slice, we would have to use a
|
||||
// https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache.
|
||||
// Using a https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache
|
||||
// avoids that.
|
||||
logger.V(5).Info("Creating new resource slice")
|
||||
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
|
||||
slice, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("create resource slice: %w", err)
|
||||
}
|
||||
atomic.AddInt64(&c.numCreates, 1)
|
||||
c.sliceStore.Mutation(slice)
|
||||
added = true
|
||||
}
|
||||
if added {
|
||||
// Check that the recently added slice(s) really exist even
|
||||
// after they expired from the mutation cache.
|
||||
c.queue.AddAfter(poolName, c.mutationCacheTTL)
|
||||
}
|
||||
} else if len(slices) > 0 {
|
||||
// All are obsolete, pool does not exist anymore.
|
||||
|
@ -674,9 +674,14 @@ func TestControllerSyncPool(t *testing.T) {
|
||||
assert.Equal(t, test.expectedStats, ctrl.GetStats())
|
||||
|
||||
// The informer might have added a work item after ctrl.run returned.
|
||||
state := queue.State()
|
||||
state.Ready = nil
|
||||
assert.Equal(t, workqueue.MockState[string]{}, state)
|
||||
actualState := queue.State()
|
||||
actualState.Ready = nil
|
||||
var expectState workqueue.MockState[string]
|
||||
if test.expectedStats.NumCreates > 0 {
|
||||
expectState.Later = []workqueue.MockDelayedItem[string]{{Item: poolName, Duration: defaultMutationCacheTTL}}
|
||||
}
|
||||
assert.Equal(t, expectState, actualState)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user