diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 098a57fff44..4d564f44073 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -102,10 +102,11 @@ type informationForClaim struct { // DynamicResources is a plugin that ensures that ResourceClaims are allocated. type DynamicResources struct { - enabled bool - enableAdminAccess bool - enablePrioritizedList bool - enableSchedulingQueueHint bool + enabled bool + enableAdminAccess bool + enablePrioritizedList bool + enableSchedulingQueueHint bool + enablePartitionableDevices bool enableDeviceTaints bool fh framework.Handle @@ -122,11 +123,12 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe } pl := &DynamicResources{ - enabled: true, - enableAdminAccess: fts.EnableDRAAdminAccess, - enableDeviceTaints: fts.EnableDRADeviceTaints, - enablePrioritizedList: fts.EnableDRAPrioritizedList, - enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, + enabled: true, + enableAdminAccess: fts.EnableDRAAdminAccess, + enableDeviceTaints: fts.EnableDRADeviceTaints, + enablePrioritizedList: fts.EnableDRAPrioritizedList, + enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, + enablePartitionableDevices: fts.EnablePartitionableDevices, fh: fh, clientset: fh.ClientSet(), @@ -455,8 +457,9 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl return nil, statusError(logger, err) } features := structured.Features{ - AdminAccess: pl.enableAdminAccess, - PrioritizedList: pl.enablePrioritizedList, + AdminAccess: pl.enableAdminAccess, + PrioritizedList: pl.enablePrioritizedList, + PartitionableDevices: pl.enablePartitionableDevices, DeviceTaints: pl.enableDeviceTaints, } allocator, err := structured.NewAllocator(ctx, features, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache) diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go index 64bdf78cb5d..7e7f1a1b8d8 100644 --- a/pkg/scheduler/framework/plugins/feature/feature.go +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -33,5 +33,6 @@ type Features struct { EnableSchedulingQueueHint bool EnableAsyncPreemption bool EnablePodLevelResources bool + EnablePartitionableDevices bool EnableStorageCapacityScoring bool } diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 44b94bbd489..3e3f27092b8 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -59,6 +59,7 @@ func NewInTreeRegistry() runtime.Registry { EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints), EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption), EnablePodLevelResources: feature.DefaultFeatureGate.Enabled(features.PodLevelResources), + EnablePartitionableDevices: feature.DefaultFeatureGate.Enabled(features.DRAPartitionableDevices), EnableStorageCapacityScoring: feature.DefaultFeatureGate.Enabled(features.StorageCapacityScoring), } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 248df0e66eb..3e9c052ec40 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -141,7 +141,9 @@ type Pool struct { // Slice is turned into one ResourceSlice by the controller. type Slice struct { // Devices lists all devices which are part of the slice. - Devices []resourceapi.Device + Devices []resourceapi.Device + SharedCounters []resourceapi.CounterSet + PerDeviceNodeSelection *bool } // +k8s:deepcopy-gen=true @@ -548,7 +550,9 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { if !apiequality.Semantic.DeepEqual(¤tSlice.Spec.Pool, &desiredPool) || !apiequality.Semantic.DeepEqual(currentSlice.Spec.NodeSelector, pool.NodeSelector) || currentSlice.Spec.AllNodes != desiredAllNodes || - !DevicesDeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) { + !DevicesDeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) || + !apiequality.Semantic.DeepEqual(currentSlice.Spec.SharedCounters, pool.Slices[i].SharedCounters) || + !apiequality.Semantic.DeepEqual(currentSlice.Spec.PerDeviceNodeSelection, pool.Slices[i].PerDeviceNodeSelection) { changedDesiredSlices.Insert(i) logger.V(5).Info("Need to update slice", "slice", klog.KObj(currentSlice), "matchIndex", i) } @@ -588,6 +592,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // have listed the existing slice. slice.Spec.NodeSelector = pool.NodeSelector slice.Spec.AllNodes = desiredAllNodes + slice.Spec.SharedCounters = pool.Slices[i].SharedCounters + slice.Spec.PerDeviceNodeSelection = pool.Slices[i].PerDeviceNodeSelection // Preserve TimeAdded from existing device, if there is a matching device and taint. slice.Spec.Devices = copyTaintTimeAdded(slice.Spec.Devices, pool.Slices[i].Devices) @@ -639,12 +645,14 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { GenerateName: generateName, }, Spec: resourceapi.ResourceSliceSpec{ - Driver: c.driverName, - Pool: desiredPool, - NodeName: nodeName, - NodeSelector: pool.NodeSelector, - AllNodes: desiredAllNodes, - Devices: pool.Slices[i].Devices, + Driver: c.driverName, + Pool: desiredPool, + NodeName: nodeName, + NodeSelector: pool.NodeSelector, + AllNodes: desiredAllNodes, + Devices: pool.Slices[i].Devices, + SharedCounters: pool.Slices[i].SharedCounters, + PerDeviceNodeSelection: pool.Slices[i].PerDeviceNodeSelection, }, } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go index beab5e5b9c4..f80b4e42f12 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go @@ -27,7 +27,6 @@ import ( v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1beta1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/dynamic-resource-allocation/api" draapi "k8s.io/dynamic-resource-allocation/api" "k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/resourceclaim" @@ -61,6 +60,7 @@ type Features struct { AdminAccess bool PrioritizedList bool PartitionableDevices bool + DeviceTaints bool } // NewAllocator returns an allocator for a certain set of claims or an error if @@ -118,6 +118,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] Allocator: a, ctx: ctx, // all methods share the same a and thus ctx logger: klog.FromContext(ctx), + node: node, deviceMatchesRequest: make(map[matchKey]bool), constraints: make([][]constraint, len(a.claimsToAllocate)), requestData: make(map[requestIndices]requestData), @@ -127,7 +128,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr) // First determine all eligible pools. - pools, err := GatherPools(ctx, alloc.slices, node, a.features.PartitionableDevices) + pools, err := GatherPools(ctx, alloc.slices, node, a.features) if err != nil { return nil, fmt.Errorf("gather pool information: %w", err) } @@ -175,7 +176,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult [] // Error out if the prioritizedList feature is not enabled and the request // has subrequests. This is to avoid surprising behavior for users. - if !a.prioritizedListEnabled && hasSubRequests { + if !a.features.PrioritizedList && hasSubRequests { return nil, fmt.Errorf("claim %s, request %s: has subrequests, but the DRAPrioritizedList feature is disabled", klog.KObj(claim), request.Name) } @@ -385,7 +386,7 @@ func (a *allocator) validateDeviceRequest(request requestAccessor, parentRequest } } - if !a.adminAccessEnabled && request.hasAdminAccess() { + if !a.features.AdminAccess && request.hasAdminAccess() { return requestData, fmt.Errorf("claim %s, request %s: admin access is requested, but the feature is disabled", klog.KObj(claim), request.name()) } @@ -462,6 +463,7 @@ type allocator struct { *Allocator ctx context.Context logger klog.Logger + node *v1.Node pools []*Pool deviceMatchesRequest map[matchKey]bool constraints [][]constraint // one list of constraints per claim @@ -514,9 +516,9 @@ type requestData struct { } type deviceWithID struct { - id DeviceID - device *draapi.Device - slice *draapi.ResourceSlice + id DeviceID + basic *draapi.BasicDevice + slice *draapi.ResourceSlice } type internalAllocationResult struct { @@ -527,7 +529,7 @@ type internalDeviceResult struct { request string // name of the request (if no subrequests) or the subrequest parentRequest string // name of the request which contains the subrequest, empty otherwise id DeviceID - device *draapi.Device + basic *draapi.BasicDevice slice *draapi.ResourceSlice adminAccess *bool } @@ -623,7 +625,7 @@ func (m *matchAttributeConstraint) add(requestName, subRequestName string, devic return true } -func (m *matchAttributeConstraint) remove(requestName, subRequestName string, device *draapi.Device, deviceID DeviceID) { +func (m *matchAttributeConstraint) remove(requestName, subRequestName string, device *draapi.BasicDevice, deviceID DeviceID) { if m.requestNames.Len() > 0 && !m.matches(requestName, subRequestName) { // Device not affected by constraint. return @@ -642,7 +644,7 @@ func (m *matchAttributeConstraint) matches(requestName, subRequestName string) b } } -func lookupAttribute(device *draapi.Device, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute { +func lookupAttribute(device *draapi.BasicDevice, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute { // Fully-qualified match? if attr, ok := device.Attributes[draapi.QualifiedName(attributeName)]; ok { return &attr @@ -809,9 +811,9 @@ func (alloc *allocator) allocateOne(r deviceIndices, allocateSubRequest bool) (b // Finally treat as allocated and move on to the next device. device := deviceWithID{ - id: deviceID, - device: &slice.Spec.Devices[deviceIndex], - slice: slice, + id: deviceID, + basic: slice.Spec.Devices[deviceIndex].Basic, + slice: slice, } allocated, deallocate, err := alloc.allocateDevice(r, device, false) if err != nil { @@ -885,12 +887,31 @@ func (alloc *allocator) isSelectable(r requestIndices, requestData requestData, return false, nil } + if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { + var nodeName string + var allNodes bool + if device.NodeName != nil { + nodeName = *device.NodeName + } + if device.AllNodes != nil { + allNodes = *device.AllNodes + } + matches, err := nodeMatches(alloc.node, nodeName, allNodes, device.NodeSelector) + if err != nil { + return false, err + } + if !matches { + alloc.deviceMatchesRequest[matchKey] = false + return false, nil + } + } + alloc.deviceMatchesRequest[matchKey] = true return true, nil } -func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.Device, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { +func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) { for i, selector := range selectors { expr := alloc.celCache.GetOrCompile(selector.CEL.Expression) if expr.Error != nil { @@ -905,15 +926,13 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.Device, return false, fmt.Errorf("claim %s: selector #%d: CEL compile error: %w", klog.KObj(alloc.claimsToAllocate[r.claimIndex]), i, expr.Error) } - attributes := make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) - if err := draapi.Convert_api_Attributes_To_v1beta1_Attributes(device.Attributes, attributes); err != nil { - return false, fmt.Errorf("convert attributes: %w", err) + // If this conversion turns out to be expensive, the CEL package could be converted + // to use unique strings. + var d resourceapi.BasicDevice + if err := draapi.Convert_api_BasicDevice_To_v1beta1_BasicDevice(device, &d, nil); err != nil { + return false, fmt.Errorf("convert BasicDevice: %w", err) } - capacity := make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity) - if err := draapi.Convert_api_Capacity_To_v1beta1_Capacity(device.Capacity, capacity); err != nil { - return false, fmt.Errorf("convert capacity: %w", err) - } - matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: attributes, Capacity: capacity}) + matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: d.Attributes, Capacity: d.Capacity}) if class != nil { alloc.logger.V(7).Info("CEL result", "device", deviceID, "class", klog.KObj(class), "selector", i, "expression", selector.CEL.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err) } else { @@ -953,15 +972,18 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus return false, nil, nil } - // If a device consumes capacity from a capacity pool, verify that - // there is sufficient capacity available. - ok, err := alloc.checkAvailableCapacity(device) - if err != nil { - return false, nil, err - } - if !ok { - alloc.logger.V(7).Info("Insufficient capacity", "device", device.id) - return false, nil, nil + // The API validation logic has checked the ConsumesCounter referred should exist inside SharedCounters. + if alloc.features.PartitionableDevices && len(device.basic.ConsumesCounter) > 0 { + // If a device consumes capacity from a capacity pool, verify that + // there is sufficient capacity available. + ok, err := alloc.checkAvailableCapacity(device) + if err != nil { + return false, nil, err + } + if !ok { + alloc.logger.V(7).Info("Insufficient capacity", "device", device.id) + return false, nil, nil + } } var parentRequestName string @@ -977,13 +999,13 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus // Might be tainted, in which case the taint has to be tolerated. // The check is skipped if the feature is disabled. - if alloc.deviceTaintsEnabled && !allTaintsTolerated(device.basic, request) { + if alloc.features.DeviceTaints && !allTaintsTolerated(device.basic, request) { return false, nil, nil } // It's available. Now check constraints. for i, constraint := range alloc.constraints[r.claimIndex] { - added := constraint.add(baseRequestName, subRequestName, device.device, device.id) + added := constraint.add(baseRequestName, subRequestName, device.basic, device.id) if !added { if must { // It does not make sense to declare a claim where a constraint prevents getting @@ -993,7 +1015,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus // Roll back for all previous constraints before we return. for e := 0; e < i; e++ { - alloc.constraints[r.claimIndex][e].remove(baseRequestName, subRequestName, device.device, device.id) + alloc.constraints[r.claimIndex][e].remove(baseRequestName, subRequestName, device.basic, device.id) } return false, nil, nil } @@ -1009,7 +1031,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus request: request.name(), parentRequest: parentRequestName, id: device.id, - device: device.device, + basic: device.basic, slice: device.slice, } if request.adminAccess() { @@ -1020,7 +1042,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus return true, func() { for _, constraint := range alloc.constraints[r.claimIndex] { - constraint.remove(baseRequestName, subRequestName, device.device, device.id) + constraint.remove(baseRequestName, subRequestName, device.basic, device.id) } if !request.adminAccess() { alloc.allocatingDevices[device.id] = false @@ -1052,23 +1074,24 @@ func taintTolerated(taint resourceapi.DeviceTaint, request requestAccessor) bool func (alloc *allocator) checkAvailableCapacity(device deviceWithID) (bool, error) { slice := device.slice - referencedCapacityPools := sets.New[api.UniqueString]() - for _, consumedCapacity := range device.device.ConsumesCapacity { - referencedCapacityPools.Insert(consumedCapacity.CapacityPool) + referencedSharedCounters := sets.New[draapi.UniqueString]() + for _, consumedCounter := range device.basic.ConsumesCounter { + referencedSharedCounters.Insert(consumedCounter.SharedCounter) } - // Create a structure that captures the initial capacity for all pools + // Create a structure that captures the initial counter for all sharedCounters // referenced by the device. - availableCapacities := make(map[api.UniqueString]map[api.QualifiedName]api.DeviceCapacity) - for _, capacityPool := range slice.Spec.CapacityPools { - if !referencedCapacityPools.Has(capacityPool.Name) { + availableCounters := make(map[draapi.UniqueString]map[string]draapi.Counter) + for _, counterSet := range slice.Spec.SharedCounters { + if !referencedSharedCounters.Has(counterSet.Name) { + // the API validation logic has been added to make sure the counterSet referred should exist in capacityPools continue } - poolCapacity := make(map[api.QualifiedName]api.DeviceCapacity) - for name, cap := range capacityPool.Capacity { - poolCapacity[name] = cap + counterShared := make(map[string]draapi.Counter, len(counterSet.Counters)) + for name, cap := range counterSet.Counters { + counterShared[name] = cap } - availableCapacities[capacityPool.Name] = poolCapacity + availableCounters[counterSet.Name] = counterShared } // Update the data structure to reflect capacity already in use. @@ -1078,30 +1101,30 @@ func (alloc *allocator) checkAvailableCapacity(device deviceWithID) (bool, error Pool: slice.Spec.Pool.Name, Device: device.Name, } - if !(alloc.allocatedDevices.Has(deviceID) || alloc.allocatingDevices[deviceID]) { + if !alloc.allocatedDevices.Has(deviceID) && !alloc.allocatingDevices[deviceID] { continue } - for _, consumedCapacity := range device.ConsumesCapacity { - poolCapacity := availableCapacities[consumedCapacity.CapacityPool] - for name, cap := range consumedCapacity.Capacity { - existingCap, ok := poolCapacity[name] + for _, consumedCounter := range device.Basic.ConsumesCounter { + counterShared := availableCounters[consumedCounter.SharedCounter] + for name, cap := range consumedCounter.Counters { + existingCap, ok := counterShared[name] if !ok { - // Just continue for now, but this probably should be an error. + // the API validation logic has been added to make sure the capacity referred should exist in capacityPools continue } // This can potentially result in negative available capacity. That is fine, // we just treat it as no capacity available. existingCap.Value.Sub(cap.Value) - poolCapacity[name] = existingCap + counterShared[name] = existingCap } } } // Check if all consumed capacities for the device can be satisfied. - for _, deviceConsumedCapacity := range device.device.ConsumesCapacity { - poolCapacity := availableCapacities[deviceConsumedCapacity.CapacityPool] - for name, cap := range deviceConsumedCapacity.Capacity { - availableCap, found := poolCapacity[name] + for _, deviceConsumedCounter := range device.basic.ConsumesCounter { + counterShared := availableCounters[deviceConsumedCounter.SharedCounter] + for name, cap := range deviceConsumedCounter.Counters { + availableCap, found := counterShared[name] // If the device requests a capacity that doesn't exist in // the pool, it can not be allocated. if !found { @@ -1131,9 +1154,11 @@ func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.N slice := result[i].slice var nodeName draapi.UniqueString var nodeSelector *v1.NodeSelector - if slice.Spec.PerDeviceNodeSelection { - nodeName = result[i].device.NodeName - nodeSelector = result[i].device.NodeSelector + if ptr.Deref(slice.Spec.PerDeviceNodeSelection, false) { + if result[i].basic.NodeName != nil { + nodeName = draapi.MakeUniqueString(*result[i].basic.NodeName) + } + nodeSelector = result[i].basic.NodeSelector } else { nodeName = slice.Spec.NodeName nodeSelector = slice.Spec.NodeSelector diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go index 36d333db515..7a6c311b5f7 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/allocator_test.go @@ -253,31 +253,29 @@ const ( fromDeviceCapacityConsumption = "fromDeviceCapacityConsumption" ) -func compositeDevice(name string, capacity any, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, - consumesCapacity ...resourceapi.DeviceCapacityConsumption) resourceapi.Device { +func partitionableDevice(name string, capacity any, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, + consumesCapacity ...resourceapi.DeviceCounterConsumption) resourceapi.Device { device := resourceapi.Device{ Name: name, - Composite: &resourceapi.CompositeDevice{ + Basic: &resourceapi.BasicDevice{ Attributes: attributes, }, } switch capacity := capacity.(type) { case map[resourceapi.QualifiedName]resource.Quantity: - device.Composite.Capacity = toDeviceCapacity(capacity) + device.Basic.Capacity = toDeviceCapacity(capacity) case string: if capacity == fromDeviceCapacityConsumption { c := make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity) for _, dcc := range consumesCapacity { - for name, cap := range dcc.Capacity { - if _, found := c[name]; found { - panic(fmt.Sprintf("same capacity found in multiple device capacity consumptions %q", name)) - } - c[name] = cap + for name, cap := range dcc.Counters { + ccap := resourceapi.DeviceCapacity(cap) + c[resourceapi.QualifiedName(name)] = ccap } } - device.Composite.Capacity = c + device.Basic.Capacity = c } else { panic(fmt.Sprintf("unexpected capacity value %q", capacity)) } @@ -287,29 +285,32 @@ func compositeDevice(name string, capacity any, attributes map[resourceapi.Quali panic(fmt.Sprintf("unexpected capacity type %T: %+v", capacity, capacity)) } - device.Composite.ConsumesCapacity = consumesCapacity + device.Basic.ConsumesCounter = consumesCapacity return device } -func compositeDeviceWithNodeSelector(name string, nodeSelection any, capacity any, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, - consumesCapacity ...resourceapi.DeviceCapacityConsumption) resourceapi.Device { - device := compositeDevice(name, capacity, attributes, consumesCapacity...) +func partitionableDeviceWithNodeSelector(name string, nodeSelection any, capacity any, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, + consumesCapacity ...resourceapi.DeviceCounterConsumption) resourceapi.Device { + device := partitionableDevice(name, capacity, attributes, consumesCapacity...) switch nodeSelection := nodeSelection.(type) { case *v1.NodeSelector: - device.Composite.NodeSelector = nodeSelection + device.Basic.NodeSelector = nodeSelection case string: if nodeSelection == nodeSelectionAll { - device.Composite.AllNodes = true + device.Basic.AllNodes = func() *bool { + r := true + return &r + }() } else if nodeSelection == nodeSelectionPerDevice { panic("nodeSelectionPerDevice is not supported for devices") } else { - device.Composite.NodeName = nodeSelection + device.Basic.NodeName = &nodeSelection } default: panic(fmt.Sprintf("unexpected nodeSelection type %T: %+v", nodeSelection, nodeSelection)) } - return wrapDevice(device) + return device } type wrapDevice resourceapi.Device @@ -325,10 +326,10 @@ func (in wrapDevice) withTaints(taints ...resourceapi.DeviceTaint) wrapDevice { return wrapDevice(*device) } -func deviceCapacityConsumption(capacityPool string, capacity map[resourceapi.QualifiedName]resource.Quantity) resourceapi.DeviceCapacityConsumption { - return resourceapi.DeviceCapacityConsumption{ - CapacityPool: capacityPool, - Capacity: toDeviceCapacity(capacity), +func deviceCapacityConsumption(capacityPool string, capacity map[resourceapi.QualifiedName]resource.Quantity) resourceapi.DeviceCounterConsumption { + return resourceapi.DeviceCounterConsumption{ + SharedCounter: capacityPool, + Counters: toDeviceCounter(capacity), } } @@ -367,7 +368,10 @@ func slice(name string, nodeSelection any, pool, driver string, devices ...wrapD if nodeSelection == nodeSelectionAll { slice.Spec.AllNodes = true } else if nodeSelection == nodeSelectionPerDevice { - slice.Spec.PerDeviceNodeSelection = true + slice.Spec.PerDeviceNodeSelection = func() *bool { + r := true + return &r + }() } else { slice.Spec.NodeName = nodeSelection } @@ -526,17 +530,17 @@ func sliceWithMultipleDevices(name string, nodeSelection any, pool, driver strin return slice(name, nodeSelection, pool, driver, devices...) } -func sliceWithCapacityPools(name string, nodeSelection any, pool, driver string, capacityPools []resourceapi.CapacityPool, devices ...resourceapi.Device) *resourceapi.ResourceSlice { +func sliceWithCapacityPools(name string, nodeSelection any, pool, driver string, sharedCounters []resourceapi.CounterSet, devices ...resourceapi.Device) *resourceapi.ResourceSlice { slice := slice(name, nodeSelection, pool, driver) - slice.Spec.CapacityPools = capacityPools + slice.Spec.SharedCounters = sharedCounters slice.Spec.Devices = devices return slice } -func capacityPool(name string, capacity map[resourceapi.QualifiedName]resource.Quantity) resourceapi.CapacityPool { - return resourceapi.CapacityPool{ +func counterSet(name string, capacity map[resourceapi.QualifiedName]resource.Quantity) resourceapi.CounterSet { + return resourceapi.CounterSet{ Name: name, - Capacity: toDeviceCapacity(capacity), + Counters: toDeviceCounter(capacity), } } @@ -548,6 +552,14 @@ func toDeviceCapacity(capacity map[resourceapi.QualifiedName]resource.Quantity) return out } +func toDeviceCounter(capacity map[resourceapi.QualifiedName]resource.Quantity) map[string]resourceapi.Counter { + out := make(map[string]resourceapi.Counter, len(capacity)) + for name, quantity := range capacity { + out[string(name)] = resourceapi.Counter{Value: quantity} + } + return out +} + func TestAllocator(t *testing.T) { nonExistentAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "NonExistentAttribute") boolAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "boolAttribute") @@ -1141,7 +1153,9 @@ func TestAllocator(t *testing.T) { )}, }, "all-devices-slice-without-devices-prioritized-list": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( func() wrapResourceClaim { claim := claimWithRequests(claim0, nil, @@ -1167,7 +1181,9 @@ func TestAllocator(t *testing.T) { )}, }, "all-devices-no-slices-prioritized-list": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( func() wrapResourceClaim { claim := claimWithRequests(claim0, nil, @@ -1192,7 +1208,9 @@ func TestAllocator(t *testing.T) { )}, }, "all-devices-some-allocated-prioritized-list": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( func() wrapResourceClaim { claim := claimWithRequests(claim0, nil, @@ -1647,6 +1665,18 @@ func TestAllocator(t *testing.T) { expectError: gomega.MatchError(gomega.ContainSubstring("empty constraint (unsupported constraint type?)")), }, + "unknown-device": { + claimsToAllocate: objects(claim(claim0, req0, classA)), + classes: objects(class(classA, driverA)), + slices: objects( + func() *resourceapi.ResourceSlice { + slice := sliceWithOneDevice(slice1, node1, pool1, driverA) + slice.Spec.Devices[0].Basic = nil /* empty = unknown future extension */ + return slice + }(), + ), + node: node(node1, region1), + }, "invalid-CEL-one-device": { claimsToAllocate: objects( func() wrapResourceClaim { @@ -1724,7 +1754,9 @@ func TestAllocator(t *testing.T) { expectError: gomega.MatchError(gomega.ContainSubstring("exceeds the claim limit")), }, "prioritized-list-first-unavailable": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0, subRequest(subReq0, classB, 1), subRequest(subReq1, classA, 1), @@ -1739,7 +1771,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-non-available": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0, subRequest(subReq0, classB, 2), subRequest(subReq1, classA, 2), @@ -1754,7 +1788,9 @@ func TestAllocator(t *testing.T) { expectResults: nil, }, "prioritized-list-device-config": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( claimWithAll(claim0, objects( @@ -1795,7 +1831,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-class-config": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0, subRequest(subReq0, classA, 2), subRequest(subReq1, classB, 2), @@ -1831,7 +1869,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-subrequests-with-expressions": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( claimWithRequests(claim0, nil, request(req0, classA, 1, resourceapi.DeviceSelector{ @@ -1873,7 +1913,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-subrequests-with-constraints-ref-parent-request": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( claimWithRequests(claim0, []resourceapi.DeviceConstraint{ @@ -1941,7 +1983,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-subrequests-with-constraints-ref-sub-request": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( claimWithRequests(claim0, []resourceapi.DeviceConstraint{ @@ -2001,7 +2045,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-subrequests-with-allocation-mode-all": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( func() wrapResourceClaim { claim := claimWithRequests(claim0, nil, @@ -2033,7 +2079,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-allocation-mode-all-multiple-requests": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( claimWithRequests(claim0, nil, request(req0, classA, 1), @@ -2064,7 +2112,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-disabled": { - prioritizedList: false, + features: Features{ + PrioritizedList: false, + }, claimsToAllocate: objects( func() wrapResourceClaim { claim := claimWithRequests(claim0, nil, @@ -2083,7 +2133,9 @@ func TestAllocator(t *testing.T) { expectError: gomega.MatchError(gomega.ContainSubstring("claim claim-0, request req-0: has subrequests, but the DRAPrioritizedList feature is disabled")), }, "prioritized-list-multi-request": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( claimWithRequests(claim0, nil, request(req1, classA, 1, resourceapi.DeviceSelector{ @@ -2132,7 +2184,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-with-backtracking": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects( claimWithRequests(claim0, nil, requestWithPrioritizedList(req0, @@ -2181,7 +2235,9 @@ func TestAllocator(t *testing.T) { )}, }, "prioritized-list-too-many-in-first-subrequest": { - prioritizedList: true, + features: Features{ + PrioritizedList: true, + }, claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0, subRequest(subReq0, classB, 500), subRequest(subReq1, classA, 1), @@ -2205,13 +2261,13 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA)), slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("8Gi"), }, ), ), - compositeDevice(device1, nil, nil, + partitionableDevice(device1, nil, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -2225,7 +2281,7 @@ func TestAllocator(t *testing.T) { deviceAllocationResult(req0, driverA, pool1, device1, false), )}, }, - "partitionable-devices-multiple-devices": { + "partitionable-devices-prioritized-list": { features: Features{ PrioritizedList: true, PartitionableDevices: true, @@ -2250,13 +2306,13 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA)), slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("8Gi"), }, ), ), - compositeDevice(device1, + partitionableDevice(device1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), }, nil, @@ -2266,7 +2322,7 @@ func TestAllocator(t *testing.T) { }, ), ), - compositeDevice(device2, + partitionableDevice(device2, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("6Gi"), }, nil, @@ -2276,7 +2332,7 @@ func TestAllocator(t *testing.T) { }, ), ), - compositeDevice(device3, fromDeviceCapacityConsumption, nil, + partitionableDevice(device3, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -2291,6 +2347,60 @@ func TestAllocator(t *testing.T) { deviceAllocationResult(req1SubReq1, driverA, pool1, device3, false), )}, }, + "partitionable-devices-multiple-devices": { + features: Features{ + PartitionableDevices: true, + }, + claimsToAllocate: objects( + claimWithRequests(claim0, nil, + request(req0, classA, 1), + request(req1, classA, 1), + ), + ), + classes: objects(class(classA, driverA)), + slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA, + objects( + counterSet(capacityPool1, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("8Gi"), + }, + ), + ), + partitionableDevice(device1, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("4Gi"), + }, nil, + deviceCapacityConsumption(capacityPool1, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("4Gi"), + }, + ), + ), + partitionableDevice(device2, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("6Gi"), + }, nil, + deviceCapacityConsumption(capacityPool1, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("6Gi"), + }, + ), + ), + partitionableDevice(device3, fromDeviceCapacityConsumption, nil, + deviceCapacityConsumption(capacityPool1, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("4Gi"), + }, + ), + ), + )), + node: node(node1, region1), + expectResults: []any{allocationResult( + localNodeSelector(node1), + deviceAllocationResult(req0, driverA, pool1, device1, false), + deviceAllocationResult(req1, driverA, pool1, device3, false), + )}, + }, "partitionable-devices-multiple-capacity-pools": { features: Features{ PrioritizedList: true, @@ -2316,18 +2426,18 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA)), slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("18Gi"), }, ), - capacityPool(capacityPool2, + counterSet(capacityPool2, map[resourceapi.QualifiedName]resource.Quantity{ "cpus": resource.MustParse("8"), }, ), ), - compositeDevice(device1, fromDeviceCapacityConsumption, nil, + partitionableDevice(device1, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -2339,7 +2449,7 @@ func TestAllocator(t *testing.T) { }, ), ), - compositeDevice(device2, fromDeviceCapacityConsumption, nil, + partitionableDevice(device2, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("6Gi"), @@ -2351,7 +2461,7 @@ func TestAllocator(t *testing.T) { }, ), ), - compositeDevice(device3, fromDeviceCapacityConsumption, nil, + partitionableDevice(device3, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -2371,6 +2481,82 @@ func TestAllocator(t *testing.T) { deviceAllocationResult(req1SubReq1, driverA, pool1, device3, false), )}, }, + "partitionable-devices-multiple-counters": { + features: Features{ + PartitionableDevices: true, + }, + claimsToAllocate: objects( + claimWithRequests(claim0, nil, + request(req0, classA, 1), + request(req1, classA, 1), + ), + ), + classes: objects(class(classA, driverA)), + slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA, + objects( + counterSet(capacityPool1, + map[resourceapi.QualifiedName]resource.Quantity{ + "cpus": resource.MustParse("8"), + "memory": resource.MustParse("18Gi"), + }, + ), + counterSet(capacityPool2, + map[resourceapi.QualifiedName]resource.Quantity{ + "cpus": resource.MustParse("12"), + "memory": resource.MustParse("18Gi"), + }, + ), + ), + partitionableDevice(device1, fromDeviceCapacityConsumption, nil, + deviceCapacityConsumption(capacityPool1, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("4Gi"), + "cpus": resource.MustParse("6"), + }, + ), + deviceCapacityConsumption(capacityPool2, + map[resourceapi.QualifiedName]resource.Quantity{ + "cpus": resource.MustParse("4"), + "memory": resource.MustParse("2Gi"), + }, + ), + ), + partitionableDevice(device2, fromDeviceCapacityConsumption, nil, + deviceCapacityConsumption(capacityPool1, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("6Gi"), + "cpus": resource.MustParse("4"), + }, + ), + deviceCapacityConsumption(capacityPool2, + map[resourceapi.QualifiedName]resource.Quantity{ + "cpus": resource.MustParse("6"), + "memory": resource.MustParse("6Gi"), + }, + ), + ), + partitionableDevice(device3, fromDeviceCapacityConsumption, nil, + deviceCapacityConsumption(capacityPool1, + map[resourceapi.QualifiedName]resource.Quantity{ + "memory": resource.MustParse("4Gi"), + "cpus": resource.MustParse("4"), + }, + ), + deviceCapacityConsumption(capacityPool2, + map[resourceapi.QualifiedName]resource.Quantity{ + "cpus": resource.MustParse("4"), + "memory": resource.MustParse("4Gi"), + }, + ), + ), + )), + node: node(node1, region1), + expectResults: []any{allocationResult( + localNodeSelector(node1), + deviceAllocationResult(req0, driverA, pool1, device2, false), + deviceAllocationResult(req1, driverA, pool1, device3, false), + )}, + }, "partitionable-devices-no-capacity-available": { features: Features{ PartitionableDevices: true, @@ -2383,20 +2569,20 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA)), slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("18Gi"), }, ), ), - compositeDevice(device1, fromDeviceCapacityConsumption, nil, + partitionableDevice(device1, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), }, ), ), - compositeDevice(device2, fromDeviceCapacityConsumption, nil, + partitionableDevice(device2, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("16Gi"), @@ -2422,20 +2608,20 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA)), slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("18Gi"), }, ), ), - compositeDevice(device1, fromDeviceCapacityConsumption, nil, + partitionableDevice(device1, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), }, ), ), - compositeDevice(device2, fromDeviceCapacityConsumption, nil, + partitionableDevice(device2, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("20Gi"), @@ -2461,13 +2647,13 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA)), slices: objects(sliceWithCapacityPools(slice1, node1, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("18Gi"), }, ), ), - compositeDevice(device1, nil, nil, + partitionableDevice(device1, nil, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -2502,20 +2688,20 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA)), slices: objects(sliceWithCapacityPools(slice1, nodeSelectionPerDevice, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("18Gi"), }, ), ), - compositeDeviceWithNodeSelector(device1, node1, fromDeviceCapacityConsumption, nil, + partitionableDeviceWithNodeSelector(device1, node1, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), }, ), ), - compositeDeviceWithNodeSelector(device2, node2, fromDeviceCapacityConsumption, nil, + partitionableDeviceWithNodeSelector(device2, node2, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("6Gi"), @@ -2540,13 +2726,13 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA)), slices: objects(sliceWithCapacityPools(slice1, nodeSelectionPerDevice, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("18Gi"), }, ), ), - compositeDeviceWithNodeSelector(device1, nodeLabelSelector(regionKey, region1), fromDeviceCapacityConsumption, nil, + partitionableDeviceWithNodeSelector(device1, nodeLabelSelector(regionKey, region1), fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -2580,27 +2766,27 @@ func TestAllocator(t *testing.T) { classes: objects(class(classA, driverA), class(classB, driverB)), slices: objects(sliceWithCapacityPools(slice1, nodeSelectionPerDevice, pool1, driverA, objects( - capacityPool(capacityPool1, + counterSet(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("18Gi"), }, ), ), - compositeDeviceWithNodeSelector(device1, nodeLabelSelector(regionKey, region1), fromDeviceCapacityConsumption, nil, + partitionableDeviceWithNodeSelector(device1, nodeLabelSelector(regionKey, region1), fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), }, ), ), - compositeDeviceWithNodeSelector(device2, node1, fromDeviceCapacityConsumption, nil, + partitionableDeviceWithNodeSelector(device2, node1, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), }, ), ), - compositeDeviceWithNodeSelector(device3, nodeSelectionAll, fromDeviceCapacityConsumption, nil, + partitionableDeviceWithNodeSelector(device3, nodeSelectionAll, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool1, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -2609,27 +2795,27 @@ func TestAllocator(t *testing.T) { ), ), sliceWithCapacityPools(slice2, node1, pool2, driverB, objects( - capacityPool(capacityPool2, + counterSet(capacityPool2, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("12Gi"), }, ), ), - compositeDevice(device1, fromDeviceCapacityConsumption, nil, + partitionableDevice(device1, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool2, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), }, ), ), - compositeDevice(device2, fromDeviceCapacityConsumption, nil, + partitionableDevice(device2, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool2, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), }, ), ), - compositeDevice(device3, fromDeviceCapacityConsumption, nil, + partitionableDevice(device3, fromDeviceCapacityConsumption, nil, deviceCapacityConsumption(capacityPool2, map[resourceapi.QualifiedName]resource.Quantity{ "memory": resource.MustParse("4Gi"), @@ -2710,7 +2896,7 @@ func TestAllocator(t *testing.T) { }, "tainted-disabled": { features: Features{ - DeviceTaints: true, + DeviceTaints: false, }, claimsToAllocate: objects(claim(claim0, req0, classA)), classes: objects(class(classA, driverA)), @@ -2725,7 +2911,7 @@ func TestAllocator(t *testing.T) { }, "tainted-prioritized-list": { features: Features{ - DeviceTaints: true, + DeviceTaints: true, PrioritizedList: true, }, claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0, @@ -2740,7 +2926,7 @@ func TestAllocator(t *testing.T) { }, "tainted-prioritized-list-disabled": { features: Features{ - DeviceTaints: false, + DeviceTaints: false, PrioritizedList: true, }, claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0, @@ -2761,7 +2947,7 @@ func TestAllocator(t *testing.T) { "tainted-admin-access": { features: Features{ DeviceTaints: true, - PrioritizedList: true, + AdminAccess: true, }, claimsToAllocate: func() []wrapResourceClaim { c := claim(claim0, req0, classA) @@ -2781,7 +2967,7 @@ func TestAllocator(t *testing.T) { "tainted-admin-access-disabled": { features: Features{ DeviceTaints: false, - AdminAccess: true, + AdminAccess: true, }, claimsToAllocate: func() []wrapResourceClaim { c := claim(claim0, req0, classA) diff --git a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go index 6d16bc5f13b..fbb247c63b2 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/structured/pools.go @@ -27,48 +27,72 @@ import ( draapi "k8s.io/dynamic-resource-allocation/api" ) +func nodeMatches(node *v1.Node, nodeNameToMatch string, allNodesMatch bool, nodeSelector *v1.NodeSelector) (bool, error) { + switch { + case nodeNameToMatch != "": + return node != nil && node.Name == nodeNameToMatch, nil + case allNodesMatch: + return true, nil + case nodeSelector != nil: + selector, err := nodeaffinity.NewNodeSelector(nodeSelector) + if err != nil { + return false, fmt.Errorf("failed to parse node selector %s: %w", nodeSelector.String(), err) + } + return selector.Match(node), nil + } + + return false, nil +} + // GatherPools collects information about all resource pools which provide // devices that are accessible from the given node. // // Out-dated slices are silently ignored. Pools may be incomplete (not all // required slices available) or invalid (for example, device names not unique). // Both is recorded in the result. -func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node, partitionableDevicesEnabled bool) ([]*Pool, error) { +func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node, features Features) ([]*Pool, error) { pools := make(map[PoolID]*Pool) - nodeName := "" - if node != nil { - nodeName = node.Name - } for _, slice := range slices { + if !features.PartitionableDevices && (len(slice.Spec.SharedCounters) > 0 || slice.Spec.PerDeviceNodeSelection != nil) { + continue + } + switch { - case slice.Spec.NodeName != "": - if slice.Spec.NodeName == nodeName { - if err := addSlice(pools, slice, node, partitionableDevicesEnabled); err != nil { - return nil, fmt.Errorf("add node slice %s: %w", slice.Name, err) - } - } - case slice.Spec.AllNodes: - if err := addSlice(pools, slice, node, partitionableDevicesEnabled); err != nil { - return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err) - } - case slice.Spec.NodeSelector != nil: - // TODO: move conversion into api. - selector, err := nodeaffinity.NewNodeSelector(slice.Spec.NodeSelector) + case slice.Spec.NodeName != "" || slice.Spec.AllNodes || slice.Spec.NodeSelector != nil: + match, err := nodeMatches(node, slice.Spec.NodeName, slice.Spec.AllNodes, slice.Spec.NodeSelector) if err != nil { - return nil, fmt.Errorf("node selector in resource slice %s: %w", slice.Name, err) + return nil, fmt.Errorf("failed to perform node selection for slice %s: %w", slice.Name, err) } - if selector.Match(node) { - if err := addSlice(pools, slice, node, partitionableDevicesEnabled); err != nil { - return nil, fmt.Errorf("add matching slice %s: %w", slice.Name, err) + if match { + if err := addSlice(pools, slice); err != nil { + return nil, fmt.Errorf("failed to add node slice %s: %w", slice.Name, err) } } - case slice.Spec.PerDeviceNodeSelection: - // We add the slice here regardless of whether the partitionable devices feature is - // enabled. If we don't, the full slice will be considered incomplete. So we filter - // out devices that have fields from the partitionable devices feature set later. - if err := addSlice(pools, slice, node, partitionableDevicesEnabled); err != nil { - return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err) + case slice.Spec.PerDeviceNodeSelection != nil && *slice.Spec.PerDeviceNodeSelection: + for _, device := range slice.Spec.Devices { + if device.Basic == nil { + continue + } + var nodeName string + var allNodes bool + if device.Basic.NodeName != nil { + nodeName = *device.Basic.NodeName + } + if device.Basic.AllNodes != nil { + allNodes = *device.Basic.AllNodes + } + match, err := nodeMatches(node, nodeName, allNodes, device.Basic.NodeSelector) + if err != nil { + return nil, fmt.Errorf("failed to perform node selection for device %s in slice %s: %w", + device.String(), slice.Name, err) + } + if match { + if err := addSlice(pools, slice); err != nil { + return nil, fmt.Errorf("failed to add node slice %s: %w", slice.Name, err) + } + break + } } default: // Nothing known was set. This must be some future, unknown extension, @@ -94,16 +118,9 @@ func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node return result, nil } -func addSlice(pools map[PoolID]*Pool, s *resourceapi.ResourceSlice, node *v1.Node, partitionableDevicesEnabled bool) error { +func addSlice(pools map[PoolID]*Pool, s *resourceapi.ResourceSlice) error { var slice draapi.ResourceSlice - sliceScope := draapi.SliceScope{ - SliceContext: draapi.SliceContext{ - Slice: s, - Node: node, - PartitionableDevicesEnabled: partitionableDevicesEnabled, - }, - } - if err := draapi.Convert_v1beta1_ResourceSlice_To_api_ResourceSlice(s, &slice, sliceScope); err != nil { + if err := draapi.Convert_v1beta1_ResourceSlice_To_api_ResourceSlice(s, &slice, nil); err != nil { return fmt.Errorf("convert ResourceSlice: %w", err) } diff --git a/test/integration/scheduler_perf/dra.go b/test/integration/scheduler_perf/dra.go index c91fef184b5..8cd7f0a2cfc 100644 --- a/test/integration/scheduler_perf/dra.go +++ b/test/integration/scheduler_perf/dra.go @@ -338,11 +338,11 @@ claims: } } - allocator, err := structured.NewAllocator(tCtx, - utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), - utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList), - utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints), - []*resourceapi.ResourceClaim{claim}, allocatedDevices, draManager.DeviceClasses(), slices, celCache) + allocator, err := structured.NewAllocator(tCtx, structured.Features{ + PrioritizedList: utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList), + AdminAccess: utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), + DeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints), + }, []*resourceapi.ResourceClaim{claim}, allocatedDevices, draManager.DeviceClasses(), slices, celCache) tCtx.ExpectNoError(err, "create allocator") rand.Shuffle(len(nodes), func(i, j int) {