DRA: validate set of devices in a pool before using the pool

The ResourceSlice controller (theoretically) might end up creating too many
slices if it syncs again before its informer cache was updated. This could
cause the scheduler to allocate a device from a duplicated slice. They should
be identical, but its still better to fail and wait until the controller
removes the redundant slice.
This commit is contained in:
Patrick Ohly 2024-10-25 21:47:10 +02:00
parent 26650371cc
commit a6d180c7d3
4 changed files with 69 additions and 5 deletions

View File

@ -578,6 +578,20 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
Devices: pool.Slices[i].Devices, Devices: pool.Slices[i].Devices,
}, },
} }
// It can happen that we create a missing slice, some
// other change than the create causes another sync of
// the pool, and then a second slice for the same set
// of devices gets created because the controller has
// no copy of the first slice instance in its informer
// cache yet.
//
// This is not a problem: a client will either see one
// of the two slices and use it or see both and do
// nothing because of the duplicated device IDs.
//
// To avoid creating a second slice, we would have to use a
// https://pkg.go.dev/k8s.io/client-go/tools/cache#MutationCache.
logger.V(5).Info("Creating new resource slice") logger.V(5).Info("Creating new resource slice")
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("create resource slice: %w", err) return fmt.Errorf("create resource slice: %w", err)

View File

@ -187,6 +187,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
if pool.IsIncomplete { if pool.IsIncomplete {
return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently being updated", klog.KObj(claim), request.Name, pool.PoolID) return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently being updated", klog.KObj(claim), request.Name, pool.PoolID)
} }
if pool.IsInvalid {
return nil, fmt.Errorf("claim %s, request %s: asks for all devices, but resource pool %s is currently invalid", klog.KObj(claim), request.Name, pool.PoolID)
}
for _, slice := range pool.Slices { for _, slice := range pool.Slices {
for deviceIndex := range slice.Spec.Devices { for deviceIndex := range slice.Spec.Devices {
@ -574,6 +577,13 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
continue continue
} }
// If the pool is not valid, then fail now. It's okay when pools of one driver
// are invalid if we allocate from some other pool, but it's not safe to
// allocated from an invalid pool.
if pool.IsInvalid {
return false, fmt.Errorf("pool %s is invalid: %s", pool.Pool, pool.InvalidReason)
}
// Finally treat as allocated and move on to the next device. // Finally treat as allocated and move on to the next device.
allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false) allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false)
if err != nil { if err != nil {

View File

@ -529,6 +529,28 @@ func TestAllocator(t *testing.T) {
deviceAllocationResult(req0, driverA, pool1, device1), deviceAllocationResult(req0, driverA, pool1, device1),
)}, )},
}, },
"duplicate-slice": {
claimsToAllocate: objects(claim(claim0, req0, classA)),
classes: objects(class(classA, driverA)),
slices: func() []*resourceapi.ResourceSlice {
// This simulates the problem that can
// (theoretically) occur when the resource
// slice controller wants to publish a pool
// with two slices but ends up creating some
// identical slices under different names
// because its informer cache was out-dated on
// another sync (see
// resourceslicecontroller.go).
sliceA := sliceWithOneDevice(slice1, node1, pool1, driverA)
sliceA.Spec.Pool.ResourceSliceCount = 2
sliceB := sliceA.DeepCopy()
sliceB.Name += "-2"
return []*resourceapi.ResourceSlice{sliceA, sliceB}
}(),
node: node(node1, region1),
expectError: gomega.MatchError(gomega.ContainSubstring(fmt.Sprintf("pool %s is invalid: duplicate device name %s", pool1, device1))),
},
"no-slices": { "no-slices": {
claimsToAllocate: objects(claim(claim0, req0, classA)), claimsToAllocate: objects(claim(claim0, req0, classA)),
classes: objects(class(classA, driverA)), classes: objects(class(classA, driverA)),

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3" resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3" resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
) )
@ -30,8 +31,9 @@ import (
// GatherPools collects information about all resource pools which provide // GatherPools collects information about all resource pools which provide
// devices that are accessible from the given node. // devices that are accessible from the given node.
// //
// Out-dated slices are silently ignored. Pools may be incomplete, which is // Out-dated slices are silently ignored. Pools may be incomplete (not all
// recorded in the result. // required slices available) or invalid (for example, device names not unique).
// Both is recorded in the result.
func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) { func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) {
pools := make(map[PoolID]*Pool) pools := make(map[PoolID]*Pool)
@ -75,6 +77,7 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL
result := make([]*Pool, 0, len(pools)) result := make([]*Pool, 0, len(pools))
for _, pool := range pools { for _, pool := range pools {
pool.IsIncomplete = int64(len(pool.Slices)) != pool.Slices[0].Spec.Pool.ResourceSliceCount pool.IsIncomplete = int64(len(pool.Slices)) != pool.Slices[0].Spec.Pool.ResourceSliceCount
pool.IsInvalid, pool.InvalidReason = poolIsInvalid(pool)
result = append(result, pool) result = append(result, pool)
} }
@ -101,17 +104,32 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) {
if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation { if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation {
// Newer, replaces all old slices. // Newer, replaces all old slices.
pool.Slices = []*resourceapi.ResourceSlice{slice} pool.Slices = nil
} }
// Add to pool. // Add to pool.
pool.Slices = append(pool.Slices, slice) pool.Slices = append(pool.Slices, slice)
} }
func poolIsInvalid(pool *Pool) (bool, string) {
devices := sets.New[string]()
for _, slice := range pool.Slices {
for _, device := range slice.Spec.Devices {
if devices.Has(device.Name) {
return true, fmt.Sprintf("duplicate device name %s", device.Name)
}
devices.Insert(device.Name)
}
}
return false, ""
}
type Pool struct { type Pool struct {
PoolID PoolID
IsIncomplete bool IsIncomplete bool
Slices []*resourceapi.ResourceSlice IsInvalid bool
InvalidReason string
Slices []*resourceapi.ResourceSlice
} }
type PoolID struct { type PoolID struct {