Complete feature impl, fix issues, add perDeviceNodeSelection support, add tests, address comments, etc.

This commit is contained in:
Cici Huang 2025-03-12 17:39:36 -07:00
parent ecba6cde1d
commit 6d7f11689d
8 changed files with 445 additions and 204 deletions

View File

@ -102,10 +102,11 @@ type informationForClaim struct {
// DynamicResources is a plugin that ensures that ResourceClaims are allocated.
type DynamicResources struct {
enabled bool
enableAdminAccess bool
enablePrioritizedList bool
enableSchedulingQueueHint bool
enabled bool
enableAdminAccess bool
enablePrioritizedList bool
enableSchedulingQueueHint bool
enablePartitionableDevices bool
enableDeviceTaints bool
fh framework.Handle
@ -122,11 +123,12 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
}
pl := &DynamicResources{
enabled: true,
enableAdminAccess: fts.EnableDRAAdminAccess,
enableDeviceTaints: fts.EnableDRADeviceTaints,
enablePrioritizedList: fts.EnableDRAPrioritizedList,
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
enabled: true,
enableAdminAccess: fts.EnableDRAAdminAccess,
enableDeviceTaints: fts.EnableDRADeviceTaints,
enablePrioritizedList: fts.EnableDRAPrioritizedList,
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
enablePartitionableDevices: fts.EnablePartitionableDevices,
fh: fh,
clientset: fh.ClientSet(),
@ -455,8 +457,9 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, statusError(logger, err)
}
features := structured.Features{
AdminAccess: pl.enableAdminAccess,
PrioritizedList: pl.enablePrioritizedList,
AdminAccess: pl.enableAdminAccess,
PrioritizedList: pl.enablePrioritizedList,
PartitionableDevices: pl.enablePartitionableDevices,
DeviceTaints: pl.enableDeviceTaints,
}
allocator, err := structured.NewAllocator(ctx, features, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)

View File

@ -33,5 +33,6 @@ type Features struct {
EnableSchedulingQueueHint bool
EnableAsyncPreemption bool
EnablePodLevelResources bool
EnablePartitionableDevices bool
EnableStorageCapacityScoring bool
}

View File

@ -59,6 +59,7 @@ func NewInTreeRegistry() runtime.Registry {
EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption),
EnablePodLevelResources: feature.DefaultFeatureGate.Enabled(features.PodLevelResources),
EnablePartitionableDevices: feature.DefaultFeatureGate.Enabled(features.DRAPartitionableDevices),
EnableStorageCapacityScoring: feature.DefaultFeatureGate.Enabled(features.StorageCapacityScoring),
}

View File

@ -141,7 +141,9 @@ type Pool struct {
// Slice is turned into one ResourceSlice by the controller.
type Slice struct {
// Devices lists all devices which are part of the slice.
Devices []resourceapi.Device
Devices []resourceapi.Device
SharedCounters []resourceapi.CounterSet
PerDeviceNodeSelection *bool
}
// +k8s:deepcopy-gen=true
@ -548,7 +550,9 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
if !apiequality.Semantic.DeepEqual(&currentSlice.Spec.Pool, &desiredPool) ||
!apiequality.Semantic.DeepEqual(currentSlice.Spec.NodeSelector, pool.NodeSelector) ||
currentSlice.Spec.AllNodes != desiredAllNodes ||
!DevicesDeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) {
!DevicesDeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) ||
!apiequality.Semantic.DeepEqual(currentSlice.Spec.SharedCounters, pool.Slices[i].SharedCounters) ||
!apiequality.Semantic.DeepEqual(currentSlice.Spec.PerDeviceNodeSelection, pool.Slices[i].PerDeviceNodeSelection) {
changedDesiredSlices.Insert(i)
logger.V(5).Info("Need to update slice", "slice", klog.KObj(currentSlice), "matchIndex", i)
}
@ -588,6 +592,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
// have listed the existing slice.
slice.Spec.NodeSelector = pool.NodeSelector
slice.Spec.AllNodes = desiredAllNodes
slice.Spec.SharedCounters = pool.Slices[i].SharedCounters
slice.Spec.PerDeviceNodeSelection = pool.Slices[i].PerDeviceNodeSelection
// Preserve TimeAdded from existing device, if there is a matching device and taint.
slice.Spec.Devices = copyTaintTimeAdded(slice.Spec.Devices, pool.Slices[i].Devices)
@ -639,12 +645,14 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
GenerateName: generateName,
},
Spec: resourceapi.ResourceSliceSpec{
Driver: c.driverName,
Pool: desiredPool,
NodeName: nodeName,
NodeSelector: pool.NodeSelector,
AllNodes: desiredAllNodes,
Devices: pool.Slices[i].Devices,
Driver: c.driverName,
Pool: desiredPool,
NodeName: nodeName,
NodeSelector: pool.NodeSelector,
AllNodes: desiredAllNodes,
Devices: pool.Slices[i].Devices,
SharedCounters: pool.Slices[i].SharedCounters,
PerDeviceNodeSelection: pool.Slices[i].PerDeviceNodeSelection,
},
}

View File

@ -27,7 +27,6 @@ import (
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/dynamic-resource-allocation/api"
draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/resourceclaim"
@ -61,6 +60,7 @@ type Features struct {
AdminAccess bool
PrioritizedList bool
PartitionableDevices bool
DeviceTaints bool
}
// NewAllocator returns an allocator for a certain set of claims or an error if
@ -118,6 +118,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
Allocator: a,
ctx: ctx, // all methods share the same a and thus ctx
logger: klog.FromContext(ctx),
node: node,
deviceMatchesRequest: make(map[matchKey]bool),
constraints: make([][]constraint, len(a.claimsToAllocate)),
requestData: make(map[requestIndices]requestData),
@ -127,7 +128,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr)
// First determine all eligible pools.
pools, err := GatherPools(ctx, alloc.slices, node, a.features.PartitionableDevices)
pools, err := GatherPools(ctx, alloc.slices, node, a.features)
if err != nil {
return nil, fmt.Errorf("gather pool information: %w", err)
}
@ -175,7 +176,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
// Error out if the prioritizedList feature is not enabled and the request
// has subrequests. This is to avoid surprising behavior for users.
if !a.prioritizedListEnabled && hasSubRequests {
if !a.features.PrioritizedList && hasSubRequests {
return nil, fmt.Errorf("claim %s, request %s: has subrequests, but the DRAPrioritizedList feature is disabled", klog.KObj(claim), request.Name)
}
@ -385,7 +386,7 @@ func (a *allocator) validateDeviceRequest(request requestAccessor, parentRequest
}
}
if !a.adminAccessEnabled && request.hasAdminAccess() {
if !a.features.AdminAccess && request.hasAdminAccess() {
return requestData, fmt.Errorf("claim %s, request %s: admin access is requested, but the feature is disabled", klog.KObj(claim), request.name())
}
@ -462,6 +463,7 @@ type allocator struct {
*Allocator
ctx context.Context
logger klog.Logger
node *v1.Node
pools []*Pool
deviceMatchesRequest map[matchKey]bool
constraints [][]constraint // one list of constraints per claim
@ -514,9 +516,9 @@ type requestData struct {
}
type deviceWithID struct {
id DeviceID
device *draapi.Device
slice *draapi.ResourceSlice
id DeviceID
basic *draapi.BasicDevice
slice *draapi.ResourceSlice
}
type internalAllocationResult struct {
@ -527,7 +529,7 @@ type internalDeviceResult struct {
request string // name of the request (if no subrequests) or the subrequest
parentRequest string // name of the request which contains the subrequest, empty otherwise
id DeviceID
device *draapi.Device
basic *draapi.BasicDevice
slice *draapi.ResourceSlice
adminAccess *bool
}
@ -623,7 +625,7 @@ func (m *matchAttributeConstraint) add(requestName, subRequestName string, devic
return true
}
func (m *matchAttributeConstraint) remove(requestName, subRequestName string, device *draapi.Device, deviceID DeviceID) {
func (m *matchAttributeConstraint) remove(requestName, subRequestName string, device *draapi.BasicDevice, deviceID DeviceID) {
if m.requestNames.Len() > 0 && !m.matches(requestName, subRequestName) {
// Device not affected by constraint.
return
@ -642,7 +644,7 @@ func (m *matchAttributeConstraint) matches(requestName, subRequestName string) b
}
}
func lookupAttribute(device *draapi.Device, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute {
func lookupAttribute(device *draapi.BasicDevice, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute {
// Fully-qualified match?
if attr, ok := device.Attributes[draapi.QualifiedName(attributeName)]; ok {
return &attr
@ -809,9 +811,9 @@ func (alloc *allocator) allocateOne(r deviceIndices, allocateSubRequest bool) (b
// Finally treat as allocated and move on to the next device.
device := deviceWithID{
id: deviceID,
device: &slice.Spec.Devices[deviceIndex],
slice: slice,
id: deviceID,
basic: slice.Spec.Devices[deviceIndex].Basic,
slice: slice,
}
allocated, deallocate, err := alloc.allocateDevice(r, device, false)
if err != nil {
@ -885,12 +887,31 @@ func (alloc *allocator) isSelectable(r requestIndices, requestData requestData,
return false, nil
}
if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) {
var nodeName string
var allNodes bool
if device.NodeName != nil {
nodeName = *device.NodeName
}
if device.AllNodes != nil {
allNodes = *device.AllNodes
}
matches, err := nodeMatches(alloc.node, nodeName, allNodes, device.NodeSelector)
if err != nil {
return false, err
}
if !matches {
alloc.deviceMatchesRequest[matchKey] = false
return false, nil
}
}
alloc.deviceMatchesRequest[matchKey] = true
return true, nil
}
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.Device, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
for i, selector := range selectors {
expr := alloc.celCache.GetOrCompile(selector.CEL.Expression)
if expr.Error != nil {
@ -905,15 +926,13 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.Device,
return false, fmt.Errorf("claim %s: selector #%d: CEL compile error: %w", klog.KObj(alloc.claimsToAllocate[r.claimIndex]), i, expr.Error)
}
attributes := make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute)
if err := draapi.Convert_api_Attributes_To_v1beta1_Attributes(device.Attributes, attributes); err != nil {
return false, fmt.Errorf("convert attributes: %w", err)
// If this conversion turns out to be expensive, the CEL package could be converted
// to use unique strings.
var d resourceapi.BasicDevice
if err := draapi.Convert_api_BasicDevice_To_v1beta1_BasicDevice(device, &d, nil); err != nil {
return false, fmt.Errorf("convert BasicDevice: %w", err)
}
capacity := make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity)
if err := draapi.Convert_api_Capacity_To_v1beta1_Capacity(device.Capacity, capacity); err != nil {
return false, fmt.Errorf("convert capacity: %w", err)
}
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: attributes, Capacity: capacity})
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: d.Attributes, Capacity: d.Capacity})
if class != nil {
alloc.logger.V(7).Info("CEL result", "device", deviceID, "class", klog.KObj(class), "selector", i, "expression", selector.CEL.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err)
} else {
@ -953,15 +972,18 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
return false, nil, nil
}
// If a device consumes capacity from a capacity pool, verify that
// there is sufficient capacity available.
ok, err := alloc.checkAvailableCapacity(device)
if err != nil {
return false, nil, err
}
if !ok {
alloc.logger.V(7).Info("Insufficient capacity", "device", device.id)
return false, nil, nil
// The API validation logic has checked the ConsumesCounter referred should exist inside SharedCounters.
if alloc.features.PartitionableDevices && len(device.basic.ConsumesCounter) > 0 {
// If a device consumes capacity from a capacity pool, verify that
// there is sufficient capacity available.
ok, err := alloc.checkAvailableCapacity(device)
if err != nil {
return false, nil, err
}
if !ok {
alloc.logger.V(7).Info("Insufficient capacity", "device", device.id)
return false, nil, nil
}
}
var parentRequestName string
@ -977,13 +999,13 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
// Might be tainted, in which case the taint has to be tolerated.
// The check is skipped if the feature is disabled.
if alloc.deviceTaintsEnabled && !allTaintsTolerated(device.basic, request) {
if alloc.features.DeviceTaints && !allTaintsTolerated(device.basic, request) {
return false, nil, nil
}
// It's available. Now check constraints.
for i, constraint := range alloc.constraints[r.claimIndex] {
added := constraint.add(baseRequestName, subRequestName, device.device, device.id)
added := constraint.add(baseRequestName, subRequestName, device.basic, device.id)
if !added {
if must {
// It does not make sense to declare a claim where a constraint prevents getting
@ -993,7 +1015,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
// Roll back for all previous constraints before we return.
for e := 0; e < i; e++ {
alloc.constraints[r.claimIndex][e].remove(baseRequestName, subRequestName, device.device, device.id)
alloc.constraints[r.claimIndex][e].remove(baseRequestName, subRequestName, device.basic, device.id)
}
return false, nil, nil
}
@ -1009,7 +1031,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
request: request.name(),
parentRequest: parentRequestName,
id: device.id,
device: device.device,
basic: device.basic,
slice: device.slice,
}
if request.adminAccess() {
@ -1020,7 +1042,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
return true, func() {
for _, constraint := range alloc.constraints[r.claimIndex] {
constraint.remove(baseRequestName, subRequestName, device.device, device.id)
constraint.remove(baseRequestName, subRequestName, device.basic, device.id)
}
if !request.adminAccess() {
alloc.allocatingDevices[device.id] = false
@ -1052,23 +1074,24 @@ func taintTolerated(taint resourceapi.DeviceTaint, request requestAccessor) bool
func (alloc *allocator) checkAvailableCapacity(device deviceWithID) (bool, error) {
slice := device.slice
referencedCapacityPools := sets.New[api.UniqueString]()
for _, consumedCapacity := range device.device.ConsumesCapacity {
referencedCapacityPools.Insert(consumedCapacity.CapacityPool)
referencedSharedCounters := sets.New[draapi.UniqueString]()
for _, consumedCounter := range device.basic.ConsumesCounter {
referencedSharedCounters.Insert(consumedCounter.SharedCounter)
}
// Create a structure that captures the initial capacity for all pools
// Create a structure that captures the initial counter for all sharedCounters
// referenced by the device.
availableCapacities := make(map[api.UniqueString]map[api.QualifiedName]api.DeviceCapacity)
for _, capacityPool := range slice.Spec.CapacityPools {
if !referencedCapacityPools.Has(capacityPool.Name) {
availableCounters := make(map[draapi.UniqueString]map[string]draapi.Counter)
for _, counterSet := range slice.Spec.SharedCounters {
if !referencedSharedCounters.Has(counterSet.Name) {
// the API validation logic has been added to make sure the counterSet referred should exist in capacityPools
continue
}
poolCapacity := make(map[api.QualifiedName]api.DeviceCapacity)
for name, cap := range capacityPool.Capacity {
poolCapacity[name] = cap
counterShared := make(map[string]draapi.Counter, len(counterSet.Counters))
for name, cap := range counterSet.Counters {
counterShared[name] = cap
}
availableCapacities[capacityPool.Name] = poolCapacity
availableCounters[counterSet.Name] = counterShared
}
// Update the data structure to reflect capacity already in use.
@ -1078,30 +1101,30 @@ func (alloc *allocator) checkAvailableCapacity(device deviceWithID) (bool, error
Pool: slice.Spec.Pool.Name,
Device: device.Name,
}
if !(alloc.allocatedDevices.Has(deviceID) || alloc.allocatingDevices[deviceID]) {
if !alloc.allocatedDevices.Has(deviceID) && !alloc.allocatingDevices[deviceID] {
continue
}
for _, consumedCapacity := range device.ConsumesCapacity {
poolCapacity := availableCapacities[consumedCapacity.CapacityPool]
for name, cap := range consumedCapacity.Capacity {
existingCap, ok := poolCapacity[name]
for _, consumedCounter := range device.Basic.ConsumesCounter {
counterShared := availableCounters[consumedCounter.SharedCounter]
for name, cap := range consumedCounter.Counters {
existingCap, ok := counterShared[name]
if !ok {
// Just continue for now, but this probably should be an error.
// the API validation logic has been added to make sure the capacity referred should exist in capacityPools
continue
}
// This can potentially result in negative available capacity. That is fine,
// we just treat it as no capacity available.
existingCap.Value.Sub(cap.Value)
poolCapacity[name] = existingCap
counterShared[name] = existingCap
}
}
}
// Check if all consumed capacities for the device can be satisfied.
for _, deviceConsumedCapacity := range device.device.ConsumesCapacity {
poolCapacity := availableCapacities[deviceConsumedCapacity.CapacityPool]
for name, cap := range deviceConsumedCapacity.Capacity {
availableCap, found := poolCapacity[name]
for _, deviceConsumedCounter := range device.basic.ConsumesCounter {
counterShared := availableCounters[deviceConsumedCounter.SharedCounter]
for name, cap := range deviceConsumedCounter.Counters {
availableCap, found := counterShared[name]
// If the device requests a capacity that doesn't exist in
// the pool, it can not be allocated.
if !found {
@ -1131,9 +1154,11 @@ func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.N
slice := result[i].slice
var nodeName draapi.UniqueString
var nodeSelector *v1.NodeSelector
if slice.Spec.PerDeviceNodeSelection {
nodeName = result[i].device.NodeName
nodeSelector = result[i].device.NodeSelector
if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) {
if result[i].basic.NodeName != nil {
nodeName = draapi.MakeUniqueString(*result[i].basic.NodeName)
}
nodeSelector = result[i].basic.NodeSelector
} else {
nodeName = slice.Spec.NodeName
nodeSelector = slice.Spec.NodeSelector

View File

@ -253,31 +253,29 @@ const (
fromDeviceCapacityConsumption = "fromDeviceCapacityConsumption"
)
func compositeDevice(name string, capacity any, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute,
consumesCapacity ...resourceapi.DeviceCapacityConsumption) resourceapi.Device {
func partitionableDevice(name string, capacity any, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute,
consumesCapacity ...resourceapi.DeviceCounterConsumption) resourceapi.Device {
device := resourceapi.Device{
Name: name,
Composite: &resourceapi.CompositeDevice{
Basic: &resourceapi.BasicDevice{
Attributes: attributes,
},
}
switch capacity := capacity.(type) {
case map[resourceapi.QualifiedName]resource.Quantity:
device.Composite.Capacity = toDeviceCapacity(capacity)
device.Basic.Capacity = toDeviceCapacity(capacity)
case string:
if capacity == fromDeviceCapacityConsumption {
c := make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity)
for _, dcc := range consumesCapacity {
for name, cap := range dcc.Capacity {
if _, found := c[name]; found {
panic(fmt.Sprintf("same capacity found in multiple device capacity consumptions %q", name))
}
c[name] = cap
for name, cap := range dcc.Counters {
ccap := resourceapi.DeviceCapacity(cap)
c[resourceapi.QualifiedName(name)] = ccap
}
}
device.Composite.Capacity = c
device.Basic.Capacity = c
} else {
panic(fmt.Sprintf("unexpected capacity value %q", capacity))
}
@ -287,29 +285,32 @@ func compositeDevice(name string, capacity any, attributes map[resourceapi.Quali
panic(fmt.Sprintf("unexpected capacity type %T: %+v", capacity, capacity))
}
device.Composite.ConsumesCapacity = consumesCapacity
device.Basic.ConsumesCounter = consumesCapacity
return device
}
func compositeDeviceWithNodeSelector(name string, nodeSelection any, capacity any, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute,
consumesCapacity ...resourceapi.DeviceCapacityConsumption) resourceapi.Device {
device := compositeDevice(name, capacity, attributes, consumesCapacity...)
func partitionableDeviceWithNodeSelector(name string, nodeSelection any, capacity any, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute,
consumesCapacity ...resourceapi.DeviceCounterConsumption) resourceapi.Device {
device := partitionableDevice(name, capacity, attributes, consumesCapacity...)
switch nodeSelection := nodeSelection.(type) {
case *v1.NodeSelector:
device.Composite.NodeSelector = nodeSelection
device.Basic.NodeSelector = nodeSelection
case string:
if nodeSelection == nodeSelectionAll {
device.Composite.AllNodes = true
device.Basic.AllNodes = func() *bool {
r := true
return &r
}()
} else if nodeSelection == nodeSelectionPerDevice {
panic("nodeSelectionPerDevice is not supported for devices")
} else {
device.Composite.NodeName = nodeSelection
device.Basic.NodeName = &nodeSelection
}
default:
panic(fmt.Sprintf("unexpected nodeSelection type %T: %+v", nodeSelection, nodeSelection))
}
return wrapDevice(device)
return device
}
type wrapDevice resourceapi.Device
@ -325,10 +326,10 @@ func (in wrapDevice) withTaints(taints ...resourceapi.DeviceTaint) wrapDevice {
return wrapDevice(*device)
}
func deviceCapacityConsumption(capacityPool string, capacity map[resourceapi.QualifiedName]resource.Quantity) resourceapi.DeviceCapacityConsumption {
return resourceapi.DeviceCapacityConsumption{
CapacityPool: capacityPool,
Capacity: toDeviceCapacity(capacity),
func deviceCapacityConsumption(capacityPool string, capacity map[resourceapi.QualifiedName]resource.Quantity) resourceapi.DeviceCounterConsumption {
return resourceapi.DeviceCounterConsumption{
SharedCounter: capacityPool,
Counters: toDeviceCounter(capacity),
}
}
@ -367,7 +368,10 @@ func slice(name string, nodeSelection any, pool, driver string, devices ...wrapD
if nodeSelection == nodeSelectionAll {
slice.Spec.AllNodes = true
} else if nodeSelection == nodeSelectionPerDevice {
slice.Spec.PerDeviceNodeSelection = true
slice.Spec.PerDeviceNodeSelection = func() *bool {
r := true
return &r
}()
} else {
slice.Spec.NodeName = nodeSelection
}
@ -526,17 +530,17 @@ func sliceWithMultipleDevices(name string, nodeSelection any, pool, driver strin
return slice(name, nodeSelection, pool, driver, devices...)
}
func sliceWithCapacityPools(name string, nodeSelection any, pool, driver string, capacityPools []resourceapi.CapacityPool, devices ...resourceapi.Device) *resourceapi.ResourceSlice {
func sliceWithCapacityPools(name string, nodeSelection any, pool, driver string, sharedCounters []resourceapi.CounterSet, devices ...resourceapi.Device) *resourceapi.ResourceSlice {
slice := slice(name, nodeSelection, pool, driver)
slice.Spec.CapacityPools = capacityPools
slice.Spec.SharedCounters = sharedCounters
slice.Spec.Devices = devices
return slice
}
func capacityPool(name string, capacity map[resourceapi.QualifiedName]resource.Quantity) resourceapi.CapacityPool {
return resourceapi.CapacityPool{
func counterSet(name string, capacity map[resourceapi.QualifiedName]resource.Quantity) resourceapi.CounterSet {
return resourceapi.CounterSet{
Name: name,
Capacity: toDeviceCapacity(capacity),
Counters: toDeviceCounter(capacity),
}
}
@ -548,6 +552,14 @@ func toDeviceCapacity(capacity map[resourceapi.QualifiedName]resource.Quantity)
return out
}
func toDeviceCounter(capacity map[resourceapi.QualifiedName]resource.Quantity) map[string]resourceapi.Counter {
out := make(map[string]resourceapi.Counter, len(capacity))
for name, quantity := range capacity {
out[string(name)] = resourceapi.Counter{Value: quantity}
}
return out
}
func TestAllocator(t *testing.T) {
nonExistentAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "NonExistentAttribute")
boolAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "boolAttribute")
@ -1141,7 +1153,9 @@ func TestAllocator(t *testing.T) {
)},
},
"all-devices-slice-without-devices-prioritized-list": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil,
@ -1167,7 +1181,9 @@ func TestAllocator(t *testing.T) {
)},
},
"all-devices-no-slices-prioritized-list": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil,
@ -1192,7 +1208,9 @@ func TestAllocator(t *testing.T) {
)},
},
"all-devices-some-allocated-prioritized-list": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil,
@ -1647,6 +1665,18 @@ func TestAllocator(t *testing.T) {
expectError: gomega.MatchError(gomega.ContainSubstring("empty constraint (unsupported constraint type?)")),
},
"unknown-device": {
claimsToAllocate: objects(claim(claim0, req0, classA)),
classes: objects(class(classA, driverA)),
slices: objects(
func() *resourceapi.ResourceSlice {
slice := sliceWithOneDevice(slice1, node1, pool1, driverA)
slice.Spec.Devices[0].Basic = nil /* empty = unknown future extension */
return slice
}(),
),
node: node(node1, region1),
},
"invalid-CEL-one-device": {
claimsToAllocate: objects(
func() wrapResourceClaim {
@ -1724,7 +1754,9 @@ func TestAllocator(t *testing.T) {
expectError: gomega.MatchError(gomega.ContainSubstring("exceeds the claim limit")),
},
"prioritized-list-first-unavailable": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0,
subRequest(subReq0, classB, 1),
subRequest(subReq1, classA, 1),
@ -1739,7 +1771,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-non-available": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0,
subRequest(subReq0, classB, 2),
subRequest(subReq1, classA, 2),
@ -1754,7 +1788,9 @@ func TestAllocator(t *testing.T) {
expectResults: nil,
},
"prioritized-list-device-config": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
claimWithAll(claim0,
objects(
@ -1795,7 +1831,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-class-config": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0,
subRequest(subReq0, classA, 2),
subRequest(subReq1, classB, 2),
@ -1831,7 +1869,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-subrequests-with-expressions": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0, nil,
request(req0, classA, 1, resourceapi.DeviceSelector{
@ -1873,7 +1913,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-subrequests-with-constraints-ref-parent-request": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0,
[]resourceapi.DeviceConstraint{
@ -1941,7 +1983,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-subrequests-with-constraints-ref-sub-request": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0,
[]resourceapi.DeviceConstraint{
@ -2001,7 +2045,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-subrequests-with-allocation-mode-all": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil,
@ -2033,7 +2079,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-allocation-mode-all-multiple-requests": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0, nil,
request(req0, classA, 1),
@ -2064,7 +2112,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-disabled": {
prioritizedList: false,
features: Features{
PrioritizedList: false,
},
claimsToAllocate: objects(
func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil,
@ -2083,7 +2133,9 @@ func TestAllocator(t *testing.T) {
expectError: gomega.MatchError(gomega.ContainSubstring("claim claim-0, request req-0: has subrequests, but the DRAPrioritizedList feature is disabled")),
},
"prioritized-list-multi-request": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0, nil,
request(req1, classA, 1, resourceapi.DeviceSelector{
@ -2132,7 +2184,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-with-backtracking": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0, nil,
requestWithPrioritizedList(req0,
@ -2181,7 +2235,9 @@ func TestAllocator(t *testing.T) {
)},
},
"prioritized-list-too-many-in-first-subrequest": {
prioritizedList: true,
features: Features{
PrioritizedList: true,
},
claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0,
subRequest(subReq0, classB, 500),
subRequest(subReq1, classA, 1),
@ -2205,13 +2261,13 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("8Gi"),
},
),
),
compositeDevice(device1, nil, nil,
partitionableDevice(device1, nil, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@ -2225,7 +2281,7 @@ func TestAllocator(t *testing.T) {
deviceAllocationResult(req0, driverA, pool1, device1, false),
)},
},
"partitionable-devices-multiple-devices": {
"partitionable-devices-prioritized-list": {
features: Features{
PrioritizedList: true,
PartitionableDevices: true,
@ -2250,13 +2306,13 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("8Gi"),
},
),
),
compositeDevice(device1,
partitionableDevice(device1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
}, nil,
@ -2266,7 +2322,7 @@ func TestAllocator(t *testing.T) {
},
),
),
compositeDevice(device2,
partitionableDevice(device2,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("6Gi"),
}, nil,
@ -2276,7 +2332,7 @@ func TestAllocator(t *testing.T) {
},
),
),
compositeDevice(device3, fromDeviceCapacityConsumption, nil,
partitionableDevice(device3, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@ -2291,6 +2347,60 @@ func TestAllocator(t *testing.T) {
deviceAllocationResult(req1SubReq1, driverA, pool1, device3, false),
)},
},
"partitionable-devices-multiple-devices": {
features: Features{
PartitionableDevices: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0, nil,
request(req0, classA, 1),
request(req1, classA, 1),
),
),
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA,
objects(
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("8Gi"),
},
),
),
partitionableDevice(device1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
}, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
partitionableDevice(device2,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("6Gi"),
}, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("6Gi"),
},
),
),
partitionableDevice(device3, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
)),
node: node(node1, region1),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceAllocationResult(req0, driverA, pool1, device1, false),
deviceAllocationResult(req1, driverA, pool1, device3, false),
)},
},
"partitionable-devices-multiple-capacity-pools": {
features: Features{
PrioritizedList: true,
@ -2316,18 +2426,18 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("18Gi"),
},
),
capacityPool(capacityPool2,
counterSet(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"cpus": resource.MustParse("8"),
},
),
),
compositeDevice(device1, fromDeviceCapacityConsumption, nil,
partitionableDevice(device1, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@ -2339,7 +2449,7 @@ func TestAllocator(t *testing.T) {
},
),
),
compositeDevice(device2, fromDeviceCapacityConsumption, nil,
partitionableDevice(device2, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("6Gi"),
@ -2351,7 +2461,7 @@ func TestAllocator(t *testing.T) {
},
),
),
compositeDevice(device3, fromDeviceCapacityConsumption, nil,
partitionableDevice(device3, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@ -2371,6 +2481,82 @@ func TestAllocator(t *testing.T) {
deviceAllocationResult(req1SubReq1, driverA, pool1, device3, false),
)},
},
"partitionable-devices-multiple-counters": {
features: Features{
PartitionableDevices: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0, nil,
request(req0, classA, 1),
request(req1, classA, 1),
),
),
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA,
objects(
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"cpus": resource.MustParse("8"),
"memory": resource.MustParse("18Gi"),
},
),
counterSet(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"cpus": resource.MustParse("12"),
"memory": resource.MustParse("18Gi"),
},
),
),
partitionableDevice(device1, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
"cpus": resource.MustParse("6"),
},
),
deviceCapacityConsumption(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"cpus": resource.MustParse("4"),
"memory": resource.MustParse("2Gi"),
},
),
),
partitionableDevice(device2, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("6Gi"),
"cpus": resource.MustParse("4"),
},
),
deviceCapacityConsumption(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"cpus": resource.MustParse("6"),
"memory": resource.MustParse("6Gi"),
},
),
),
partitionableDevice(device3, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
"cpus": resource.MustParse("4"),
},
),
deviceCapacityConsumption(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"cpus": resource.MustParse("4"),
"memory": resource.MustParse("4Gi"),
},
),
),
)),
node: node(node1, region1),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceAllocationResult(req0, driverA, pool1, device2, false),
deviceAllocationResult(req1, driverA, pool1, device3, false),
)},
},
"partitionable-devices-no-capacity-available": {
features: Features{
PartitionableDevices: true,
@ -2383,20 +2569,20 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("18Gi"),
},
),
),
compositeDevice(device1, fromDeviceCapacityConsumption, nil,
partitionableDevice(device1, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
compositeDevice(device2, fromDeviceCapacityConsumption, nil,
partitionableDevice(device2, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("16Gi"),
@ -2422,20 +2608,20 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("18Gi"),
},
),
),
compositeDevice(device1, fromDeviceCapacityConsumption, nil,
partitionableDevice(device1, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
compositeDevice(device2, fromDeviceCapacityConsumption, nil,
partitionableDevice(device2, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("20Gi"),
@ -2461,13 +2647,13 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("18Gi"),
},
),
),
compositeDevice(device1, nil, nil,
partitionableDevice(device1, nil, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@ -2502,20 +2688,20 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, nodeSelectionPerDevice, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("18Gi"),
},
),
),
compositeDeviceWithNodeSelector(device1, node1, fromDeviceCapacityConsumption, nil,
partitionableDeviceWithNodeSelector(device1, node1, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
compositeDeviceWithNodeSelector(device2, node2, fromDeviceCapacityConsumption, nil,
partitionableDeviceWithNodeSelector(device2, node2, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("6Gi"),
@ -2540,13 +2726,13 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA)),
slices: objects(sliceWithCapacityPools(slice1, nodeSelectionPerDevice, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("18Gi"),
},
),
),
compositeDeviceWithNodeSelector(device1, nodeLabelSelector(regionKey, region1), fromDeviceCapacityConsumption, nil,
partitionableDeviceWithNodeSelector(device1, nodeLabelSelector(regionKey, region1), fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@ -2580,27 +2766,27 @@ func TestAllocator(t *testing.T) {
classes: objects(class(classA, driverA), class(classB, driverB)),
slices: objects(sliceWithCapacityPools(slice1, nodeSelectionPerDevice, pool1, driverA,
objects(
capacityPool(capacityPool1,
counterSet(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("18Gi"),
},
),
),
compositeDeviceWithNodeSelector(device1, nodeLabelSelector(regionKey, region1), fromDeviceCapacityConsumption, nil,
partitionableDeviceWithNodeSelector(device1, nodeLabelSelector(regionKey, region1), fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
compositeDeviceWithNodeSelector(device2, node1, fromDeviceCapacityConsumption, nil,
partitionableDeviceWithNodeSelector(device2, node1, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
compositeDeviceWithNodeSelector(device3, nodeSelectionAll, fromDeviceCapacityConsumption, nil,
partitionableDeviceWithNodeSelector(device3, nodeSelectionAll, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool1,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@ -2609,27 +2795,27 @@ func TestAllocator(t *testing.T) {
),
), sliceWithCapacityPools(slice2, node1, pool2, driverB,
objects(
capacityPool(capacityPool2,
counterSet(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("12Gi"),
},
),
),
compositeDevice(device1, fromDeviceCapacityConsumption, nil,
partitionableDevice(device1, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
compositeDevice(device2, fromDeviceCapacityConsumption, nil,
partitionableDevice(device2, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
),
compositeDevice(device3, fromDeviceCapacityConsumption, nil,
partitionableDevice(device3, fromDeviceCapacityConsumption, nil,
deviceCapacityConsumption(capacityPool2,
map[resourceapi.QualifiedName]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@ -2710,7 +2896,7 @@ func TestAllocator(t *testing.T) {
},
"tainted-disabled": {
features: Features{
DeviceTaints: true,
DeviceTaints: false,
},
claimsToAllocate: objects(claim(claim0, req0, classA)),
classes: objects(class(classA, driverA)),
@ -2725,7 +2911,7 @@ func TestAllocator(t *testing.T) {
},
"tainted-prioritized-list": {
features: Features{
DeviceTaints: true,
DeviceTaints: true,
PrioritizedList: true,
},
claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0,
@ -2740,7 +2926,7 @@ func TestAllocator(t *testing.T) {
},
"tainted-prioritized-list-disabled": {
features: Features{
DeviceTaints: false,
DeviceTaints: false,
PrioritizedList: true,
},
claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0,
@ -2761,7 +2947,7 @@ func TestAllocator(t *testing.T) {
"tainted-admin-access": {
features: Features{
DeviceTaints: true,
PrioritizedList: true,
AdminAccess: true,
},
claimsToAllocate: func() []wrapResourceClaim {
c := claim(claim0, req0, classA)
@ -2781,7 +2967,7 @@ func TestAllocator(t *testing.T) {
"tainted-admin-access-disabled": {
features: Features{
DeviceTaints: false,
AdminAccess: true,
AdminAccess: true,
},
claimsToAllocate: func() []wrapResourceClaim {
c := claim(claim0, req0, classA)

View File

@ -27,48 +27,72 @@ import (
draapi "k8s.io/dynamic-resource-allocation/api"
)
func nodeMatches(node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) {
switch {
case nodeNameToMatch != "":
return node != nil && node.Name == nodeNameToMatch, nil
case allNodesMatch:
return true, nil
case nodeSelector != nil:
selector, err := nodeaffinity.NewNodeSelector(nodeSelector)
if err != nil {
return false, fmt.Errorf("failed to parse node selector %s: %w", nodeSelector.String(), err)
}
return selector.Match(node), nil
}
return false, nil
}
// GatherPools collects information about all resource pools which provide
// devices that are accessible from the given node.
//
// Out-dated slices are silently ignored. Pools may be incomplete (not all
// required slices available) or invalid (for example, device names not unique).
// Both is recorded in the result.
func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node, partitionableDevicesEnabled bool) ([]*Pool, error) {
func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node, features Features) ([]*Pool, error) {
pools := make(map[PoolID]*Pool)
nodeName := ""
if node != nil {
nodeName = node.Name
}
for _, slice := range slices {
if !features.PartitionableDevices && (len(slice.Spec.SharedCounters) > 0 || slice.Spec.PerDeviceNodeSelection != nil) {
continue
}
switch {
case slice.Spec.NodeName != "":
if slice.Spec.NodeName == nodeName {
if err := addSlice(pools, slice, node, partitionableDevicesEnabled); err != nil {
return nil, fmt.Errorf("add node slice %s: %w", slice.Name, err)
}
}
case slice.Spec.AllNodes:
if err := addSlice(pools, slice, node, partitionableDevicesEnabled); err != nil {
return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err)
}
case slice.Spec.NodeSelector != nil:
// TODO: move conversion into api.
selector, err := nodeaffinity.NewNodeSelector(slice.Spec.NodeSelector)
case slice.Spec.NodeName != "" || slice.Spec.AllNodes || slice.Spec.NodeSelector != nil:
match, err := nodeMatches(node, slice.Spec.NodeName, slice.Spec.AllNodes, slice.Spec.NodeSelector)
if err != nil {
return nil, fmt.Errorf("node selector in resource slice %s: %w", slice.Name, err)
return nil, fmt.Errorf("failed to perform node selection for slice %s: %w", slice.Name, err)
}
if selector.Match(node) {
if err := addSlice(pools, slice, node, partitionableDevicesEnabled); err != nil {
return nil, fmt.Errorf("add matching slice %s: %w", slice.Name, err)
if match {
if err := addSlice(pools, slice); err != nil {
return nil, fmt.Errorf("failed to add node slice %s: %w", slice.Name, err)
}
}
case slice.Spec.PerDeviceNodeSelection:
// We add the slice here regardless of whether the partitionable devices feature is
// enabled. If we don't, the full slice will be considered incomplete. So we filter
// out devices that have fields from the partitionable devices feature set later.
if err := addSlice(pools, slice, node, partitionableDevicesEnabled); err != nil {
return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err)
case slice.Spec.PerDeviceNodeSelection != nil && *slice.Spec.PerDeviceNodeSelection:
for _, device := range slice.Spec.Devices {
if device.Basic == nil {
continue
}
var nodeName string
var allNodes bool
if device.Basic.NodeName != nil {
nodeName = *device.Basic.NodeName
}
if device.Basic.AllNodes != nil {
allNodes = *device.Basic.AllNodes
}
match, err := nodeMatches(node, nodeName, allNodes, device.Basic.NodeSelector)
if err != nil {
return nil, fmt.Errorf("failed to perform node selection for device %s in slice %s: %w",
device.String(), slice.Name, err)
}
if match {
if err := addSlice(pools, slice); err != nil {
return nil, fmt.Errorf("failed to add node slice %s: %w", slice.Name, err)
}
break
}
}
default:
// Nothing known was set. This must be some future, unknown extension,
@ -94,16 +118,9 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node
return result, nil
}
func addSlice(pools map[PoolID]*Pool, s *resourceapi.ResourceSlice, node *v1.Node, partitionableDevicesEnabled bool) error {
func addSlice(pools map[PoolID]*Pool, s *resourceapi.ResourceSlice) error {
var slice draapi.ResourceSlice
sliceScope := draapi.SliceScope{
SliceContext: draapi.SliceContext{
Slice: s,
Node: node,
PartitionableDevicesEnabled: partitionableDevicesEnabled,
},
}
if err := draapi.Convert_v1beta1_ResourceSlice_To_api_ResourceSlice(s, &slice, sliceScope); err != nil {
if err := draapi.Convert_v1beta1_ResourceSlice_To_api_ResourceSlice(s, &slice, nil); err != nil {
return fmt.Errorf("convert ResourceSlice: %w", err)
}

View File

@ -338,11 +338,11 @@ claims:
}
}
allocator, err := structured.NewAllocator(tCtx,
utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList),
utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
[]*resourceapi.ResourceClaim{claim}, allocatedDevices, draManager.DeviceClasses(), slices, celCache)
allocator, err := structured.NewAllocator(tCtx, structured.Features{
PrioritizedList: utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList),
AdminAccess: utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
DeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
}, []*resourceapi.ResourceClaim{claim}, allocatedDevices, draManager.DeviceClasses(), slices, celCache)
tCtx.ExpectNoError(err, "create allocator")
rand.Shuffle(len(nodes), func(i, j int) {