Fix DRAConsumableCapacity to be able to allocate the same device that previously consumed the counterSet

Signed-off-by: Sunyanan Choochotkaew <sunyanan.choochotkaew1@ibm.com>
This commit is contained in:
Sunyanan Choochotkaew
2025-09-17 15:37:52 +09:00
parent 71fa43e37f
commit fb228c4704
2 changed files with 151 additions and 14 deletions

View File

@@ -4876,17 +4876,45 @@ func TestAllocator(t *testing.T,
},
"allow-multiple-allocations-with-partitionable-device": {
features: Features{
PrioritizedList: true,
PartitionableDevices: true,
ConsumableCapacity: true,
},
claimsToAllocate: objects(
claimWithRequests(claim0, nil, request(req0, classA, 1)),
claimWithRequests(claim1, nil, request(req0, classA, 1)),
claim(claim0).withRequests(
deviceRequest(req0, classA, 1),
requestWithPrioritizedList(req1,
subRequest(subReq0, classA, 1, resourceapi.DeviceSelector{
CEL: &resourceapi.CELDeviceSelector{
Expression: fmt.Sprintf(`device.capacity["%s"].memory.compareTo(quantity("6Gi")) >= 0`, driverA),
}},
),
subRequest(subReq1, classA, 1, resourceapi.DeviceSelector{
CEL: &resourceapi.CELDeviceSelector{
Expression: fmt.Sprintf(`device.capacity["%s"].memory.compareTo(quantity("4Gi")) >= 0`, driverA),
}},
),
),
),
),
classes: objects(class(classA, driverA)),
classes: objects(classWithAllowMultipleAllocations(classA, driverA, true)),
slices: unwrap(
slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withDeviceCounterConsumption(
device(device1, fromCounters, nil).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1,
map[string]resource.Quantity{
"memory": resource.MustParse("4Gi"),
},
),
).withAllowMultipleAllocations(),
device(device2, fromCounters, nil).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1,
map[string]resource.Quantity{
"memory": resource.MustParse("6Gi"),
},
),
).withAllowMultipleAllocations(),
device(device3, fromCounters, nil).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1,
map[string]resource.Quantity{
"memory": resource.MustParse("4Gi"),
@@ -4902,16 +4930,112 @@ func TestAllocator(t *testing.T,
),
),
node: node(node1, region1),
expectResults: []any{
allocationResult(
localNodeSelector(node1),
deviceRequestAllocationResult(req0, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, nil),
),
allocationResult(
localNodeSelector(node1),
deviceRequestAllocationResult(req0, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, nil),
),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceRequestAllocationResult(req0, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{"memory": resource.MustParse("4Gi")}),
deviceRequestAllocationResult(req1SubReq1, driverA, pool1, device3).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{"memory": resource.MustParse("4Gi")}),
)},
},
"consumable-capacity-with-partitionable-device-multiple-capacity-pools": {
// This test case combines integration of PrioritizedList, PartitionableDevices, and ConsumableCapacity features.
features: Features{
PrioritizedList: true,
PartitionableDevices: true,
ConsumableCapacity: true,
},
// There are two basic requests (req0 and req2), requesting 1 and 2 units of capacity0, respectively.
// In addition, there is one request (req1) with a prioritized list.
// The prioritized request prefers devices with capacity0 >= 4 over those with capacity0 >= 2.
claimsToAllocate: objects(
claim(claim0).withRequests(
deviceRequest(req0, classA, 1).withCapacityRequest(ptr.To(one)),
deviceRequest(req2, classA, 1).withCapacityRequest(ptr.To(two)),
requestWithPrioritizedList(req1,
subRequest(subReq0, classA, 1, resourceapi.DeviceSelector{
CEL: &resourceapi.CELDeviceSelector{
Expression: fmt.Sprintf(`device.capacity["%s"]["%s"].compareTo(quantity("4")) >= 0`, driverA, capacity0),
}}).withCapacityRequest(ptr.To(one)),
subRequest(subReq1, classA, 1, resourceapi.DeviceSelector{
CEL: &resourceapi.CELDeviceSelector{
Expression: fmt.Sprintf(`device.capacity["%s"]["%s"].compareTo(quantity("2")) >= 0`, driverA, capacity0),
}}).withCapacityRequest(ptr.To(one)),
),
),
),
classes: objects(class(classA, driverA)),
// There three device option consuming counters from the PartitionableDevices's CounterSet.
// All devices can be allocated multiple times with `allowMultipleAllocations=true`.
// CounterSet (4, 8)
// device1, device2, and device3 consume (2,4), (4,4), and (2,4) for (capacity0, capacity1) respectively.
// i.e., only two options are valid those are [device1, device3] or [device2].
// Capacity.RequestPolicy of ConsumableCapacity forces the capacity1 consuming with range policy (min,step,max)=(2,2,4).
slices: unwrap(
slice(slice1, node1, pool1, driverA,
device(device1, fromCounters, nil).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1,
map[string]resource.Quantity{
capacity0: two,
},
),
deviceCounterConsumption(counterSet2,
map[string]resource.Quantity{
capacity1: four,
},
),
).withAllowMultipleAllocations().withCapacityRequestPolicyRange((map[resourceapi.QualifiedName]resource.Quantity{capacity1: four})),
device(device2, fromCounters, nil).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1,
map[string]resource.Quantity{
capacity0: four,
},
),
deviceCounterConsumption(counterSet2,
map[string]resource.Quantity{
capacity1: four,
},
),
).withAllowMultipleAllocations().withCapacityRequestPolicyRange((map[resourceapi.QualifiedName]resource.Quantity{capacity1: four})),
device(device3, fromCounters, nil).withDeviceCounterConsumption(
deviceCounterConsumption(counterSet1,
map[string]resource.Quantity{
capacity0: two,
},
),
deviceCounterConsumption(counterSet2,
map[string]resource.Quantity{
capacity1: four,
},
),
).withAllowMultipleAllocations().withCapacityRequestPolicyRange((map[resourceapi.QualifiedName]resource.Quantity{capacity1: four})),
).withCounterSet(
counterSet(counterSet1,
map[string]resource.Quantity{
capacity0: four,
},
),
counterSet(counterSet2,
map[string]resource.Quantity{
capacity1: resource.MustParse("8"),
},
),
),
),
node: node(node1, region1),
// Expected results:
// - [req0] The scheduler should be able to allocate device1 to req0.
// 1 unit of capacity0 is consumed according to the requested capacity, and 2 units of capacity1 are consumed according to the RequestPolicy.
// - [req2] device1 does not have enough capacity0 remaining for req2, which requests 2 units (only 1 unit remains).
// The scheduler must also skip device2 due to the CounterSet constraint.
// Therefore, device3 is allocated to req2, providing 2 units of capacity0 and 2 units of capacity1.
// - [req1] The first prioritized subrequest of req1 cannot find a device because device2 cannot be selected.
// The second prioritized subrequest is then chosen and consumes the remaining 1 unit of capacity0 on device1.
// Consequently, device1 is allocated to subreq1 of req1 with 1 unit of capacity0 and 2 units of capacity1.
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceRequestAllocationResult(req0, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{capacity0: one, capacity1: two}),
deviceRequestAllocationResult(req2, driverA, pool1, device3).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{capacity0: two, capacity1: two}),
deviceRequestAllocationResult(req1SubReq1, driverA, pool1, device1).withConsumedCapacity(&fixedShareID, map[resourceapi.QualifiedName]resource.Quantity{capacity0: one, capacity1: two}),
)},
},
"distinct-constraint-one-multi-allocatable-device-with-distinct-constraint": {
features: Features{

View File

@@ -1235,8 +1235,11 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
return false, nil, nil
}
// Skip counter availability check for devices that allow multiple allocation and some capacity has already in-use.
skipCounterCheck := allowMultipleAllocations && alloc.deviceCapacityInUse(device.id)
// The API validation logic has checked the ConsumesCounters referred should exist inside SharedCounters.
if len(device.ConsumesCounters) > 0 {
if !skipCounterCheck && len(device.ConsumesCounters) > 0 {
// If a device consumes counters from a counter set, verify that
// there is sufficient counters available.
ok, err := alloc.checkAvailableCounters(device)
@@ -1502,6 +1505,11 @@ func (alloc *allocator) deviceInUse(deviceID DeviceID) bool {
return alloc.allocatedState.AllocatedDevices.Has(deviceID) || alloc.allocatingDeviceForAnyClaim(deviceID)
}
func (alloc *allocator) deviceCapacityInUse(deviceID DeviceID) bool {
_, found := alloc.allocatedState.AggregatedCapacity[deviceID]
return found || alloc.allocatingCapacityForAnyClaim(deviceID)
}
func (alloc *allocator) allocatingDeviceForAnyClaim(deviceID DeviceID) bool {
return alloc.allocatingDevices[deviceID].Len() > 0
}
@@ -1510,6 +1518,11 @@ func (alloc *allocator) allocatingDeviceForClaim(deviceID DeviceID, claimIndex i
return alloc.allocatingDevices[deviceID].Has(claimIndex)
}
func (alloc *allocator) allocatingCapacityForAnyClaim(deviceID DeviceID) bool {
_, found := alloc.allocatingCapacity[deviceID]
return found
}
// deallocateCountersForDevice subtracts the consumed counters of the provided
// device from the consumedCounters data structure.
func (alloc *allocator) deallocateCountersForDevice(device deviceWithID) {