DRA scheduler: code cleanups

Looking up the slice can be avoided by storing it when allocating a device.
The AllocationResult struct is small enough that it can be copied by value.

    goos: linux
    goarch: amd64
    pkg: k8s.io/kubernetes/test/integration/scheduler_perf
    cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz
                                                                                       │            before            │                       after                        │
                                                                                       │ SchedulingThroughput/Average │ SchedulingThroughput/Average  vs base              │
    PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36                      33.30 ± 2%                     33.95 ± 4%       ~ (p=0.288 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36                     105.3 ± 2%                     105.8 ± 2%       ~ (p=0.524 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36                     100.8 ± 1%                     100.7 ± 1%       ~ (p=0.738 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36                      90.96 ± 2%                     90.78 ± 1%       ~ (p=0.952 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36                      49.84 ± 4%                     50.51 ± 7%       ~ (p=0.485 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36                      103.8 ± 1%                     103.7 ± 5%       ~ (p=0.582 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36                      27.21 ± 7%                     28.50 ± 2%       ~ (p=0.065 n=6)
    geomean                                                                                                64.26                          64.99       +1.14%
This commit is contained in:
Patrick Ohly 2024-09-04 16:42:03 +02:00
parent 1246898315
commit 941d17b3b8
4 changed files with 78 additions and 72 deletions

View File

@ -87,7 +87,7 @@ type stateData struct {
informationsForClaim []informationForClaim informationsForClaim []informationForClaim
// nodeAllocations caches the result of Filter for the nodes. // nodeAllocations caches the result of Filter for the nodes.
nodeAllocations map[string][]*resourceapi.AllocationResult nodeAllocations map[string][]resourceapi.AllocationResult
} }
func (d *stateData) Clone() framework.StateData { func (d *stateData) Clone() framework.StateData {
@ -545,7 +545,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, statusError(logger, err) return nil, statusError(logger, err)
} }
s.allocator = allocator s.allocator = allocator
s.nodeAllocations = make(map[string][]*resourceapi.AllocationResult) s.nodeAllocations = make(map[string][]resourceapi.AllocationResult)
} }
s.claims = claims s.claims = claims
@ -634,7 +634,7 @@ func (pl *DynamicResources) Filter(ctx context.Context, cs *framework.CycleState
} }
// Use allocator to check the node and cache the result in case that the node is picked. // Use allocator to check the node and cache the result in case that the node is picked.
var allocations []*resourceapi.AllocationResult var allocations []resourceapi.AllocationResult
if state.allocator != nil { if state.allocator != nil {
allocCtx := ctx allocCtx := ctx
if loggerV := logger.V(5); loggerV.Enabled() { if loggerV := logger.V(5); loggerV.Enabled() {
@ -782,7 +782,7 @@ func (pl *DynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
if index < 0 { if index < 0 {
return statusError(logger, fmt.Errorf("internal error, claim %s with allocation not found", claim.Name)) return statusError(logger, fmt.Errorf("internal error, claim %s with allocation not found", claim.Name))
} }
allocation := allocations[i] allocation := &allocations[i]
state.informationsForClaim[index].allocation = allocation state.informationsForClaim[index].allocation = allocation
// Strictly speaking, we don't need to store the full modified object. // Strictly speaking, we don't need to store the full modified object.

View File

@ -96,7 +96,7 @@ func (a *Allocator) ClaimsToAllocate() []*resourceapi.ResourceClaim {
// additional value. A name can also be useful because log messages do not // additional value. A name can also be useful because log messages do not
// have a common prefix. V(5) is used for one-time log entries, V(6) for important // have a common prefix. V(5) is used for one-time log entries, V(6) for important
// progress reports, and V(7) for detailed debug output. // progress reports, and V(7) for detailed debug output.
func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []*resourceapi.AllocationResult, finalErr error) { func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []resourceapi.AllocationResult, finalErr error) {
alloc := &allocator{ alloc := &allocator{
Allocator: a, Allocator: a,
ctx: ctx, // all methods share the same a and thus ctx ctx: ctx, // all methods share the same a and thus ctx
@ -104,7 +104,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
deviceMatchesRequest: make(map[matchKey]bool), deviceMatchesRequest: make(map[matchKey]bool),
constraints: make([][]constraint, len(a.claimsToAllocate)), constraints: make([][]constraint, len(a.claimsToAllocate)),
requestData: make(map[requestIndices]requestData), requestData: make(map[requestIndices]requestData),
result: make([]*resourceapi.AllocationResult, len(a.claimsToAllocate)), result: make([]internalAllocationResult, len(a.claimsToAllocate)),
} }
alloc.logger.V(5).Info("Starting allocation", "numClaims", len(alloc.claimsToAllocate)) alloc.logger.V(5).Info("Starting allocation", "numClaims", len(alloc.claimsToAllocate))
defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr) defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr)
@ -206,7 +206,12 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
return nil, err return nil, err
} }
if selectable { if selectable {
requestData.allDevices = append(requestData.allDevices, deviceWithID{device: slice.Spec.Devices[deviceIndex].Basic, DeviceID: DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name}}) device := deviceWithID{
id: DeviceID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name, Device: slice.Spec.Devices[deviceIndex].Name},
basic: slice.Spec.Devices[deviceIndex].Basic,
slice: slice,
}
requestData.allDevices = append(requestData.allDevices, device)
} }
} }
} }
@ -228,16 +233,12 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
// If we don't, then we can pre-allocate the result slices for // If we don't, then we can pre-allocate the result slices for
// appending the actual results later. // appending the actual results later.
alloc.result[claimIndex] = &resourceapi.AllocationResult{ alloc.result[claimIndex].devices = make([]internalDeviceResult, 0, numDevicesPerClaim)
Devices: resourceapi.DeviceAllocationResult{
Results: make([]resourceapi.DeviceRequestAllocationResult, 0, numDevicesPerClaim),
},
}
// Constraints are assumed to be monotonic: once a constraint returns // Constraints are assumed to be monotonic: once a constraint returns
// false, adding more devices will not cause it to return true. This // false, adding more devices will not cause it to return true. This
// allows the search to stop early once a constraint returns false. // allows the search to stop early once a constraint returns false.
var constraints = make([]constraint, len(claim.Spec.Devices.Constraints)) constraints := make([]constraint, len(claim.Spec.Devices.Constraints))
for i, constraint := range claim.Spec.Devices.Constraints { for i, constraint := range claim.Spec.Devices.Constraints {
switch { switch {
case constraint.MatchAttribute != nil: case constraint.MatchAttribute != nil:
@ -306,8 +307,20 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
return nil, nil return nil, nil
} }
for claimIndex, allocationResult := range alloc.result { result := make([]resourceapi.AllocationResult, len(alloc.result))
for claimIndex, internalResult := range alloc.result {
claim := alloc.claimsToAllocate[claimIndex] claim := alloc.claimsToAllocate[claimIndex]
allocationResult := &result[claimIndex]
allocationResult.Devices.Results = make([]resourceapi.DeviceRequestAllocationResult, len(internalResult.devices))
for i, internal := range internalResult.devices {
allocationResult.Devices.Results[i] = resourceapi.DeviceRequestAllocationResult{
Request: internal.request,
Driver: internal.id.Driver.String(),
Pool: internal.id.Pool.String(),
Device: internal.id.Device.String(),
AdminAccess: internal.adminAccess,
}
}
// Populate configs. // Populate configs.
for requestIndex := range claim.Spec.Devices.Requests { for requestIndex := range claim.Spec.Devices.Requests {
@ -331,14 +344,14 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
} }
// Determine node selector. // Determine node selector.
nodeSelector, err := alloc.createNodeSelector(allocationResult) nodeSelector, err := alloc.createNodeSelector(internalResult.devices)
if err != nil { if err != nil {
return nil, fmt.Errorf("create NodeSelector for claim %s: %w", claim.Name, err) return nil, fmt.Errorf("create NodeSelector for claim %s: %w", claim.Name, err)
} }
allocationResult.NodeSelector = nodeSelector allocationResult.NodeSelector = nodeSelector
} }
return alloc.result, nil return result, nil
} }
// errStop is a special error that gets returned by allocateOne if it detects // errStop is a special error that gets returned by allocateOne if it detects
@ -356,7 +369,7 @@ type allocator struct {
constraints [][]constraint // one list of constraints per claim constraints [][]constraint // one list of constraints per claim
requestData map[requestIndices]requestData // one entry per request requestData map[requestIndices]requestData // one entry per request
allocated map[DeviceID]bool allocated map[DeviceID]bool
result []*resourceapi.AllocationResult result []internalAllocationResult
} }
// matchKey identifies a device/request pair. // matchKey identifies a device/request pair.
@ -386,8 +399,20 @@ type requestData struct {
} }
type deviceWithID struct { type deviceWithID struct {
DeviceID id DeviceID
device *draapi.BasicDevice basic *draapi.BasicDevice
slice *draapi.ResourceSlice
}
type internalAllocationResult struct {
devices []internalDeviceResult
}
type internalDeviceResult struct {
request string
id DeviceID
slice *draapi.ResourceSlice
adminAccess *bool
} }
type constraint interface { type constraint interface {
@ -542,7 +567,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
// For "all" devices we already know which ones we need. We // For "all" devices we already know which ones we need. We
// just need to check whether we can use them. // just need to check whether we can use them.
deviceWithID := requestData.allDevices[r.deviceIndex] deviceWithID := requestData.allDevices[r.deviceIndex]
success, _, err := alloc.allocateDevice(r, deviceWithID.device, deviceWithID.DeviceID, true) success, _, err := alloc.allocateDevice(r, deviceWithID, true)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -593,7 +618,12 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
} }
// Finally treat as allocated and move on to the next device. // Finally treat as allocated and move on to the next device.
allocated, deallocate, err := alloc.allocateDevice(r, slice.Spec.Devices[deviceIndex].Basic, deviceID, false) device := deviceWithID{
id: deviceID,
basic: slice.Spec.Devices[deviceIndex].Basic,
slice: slice,
}
allocated, deallocate, err := alloc.allocateDevice(r, device, false)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -716,28 +746,28 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDev
// as if that candidate had been allocated. If allocation cannot continue later // as if that candidate had been allocated. If allocation cannot continue later
// and must try something else, then the rollback function can be invoked to // and must try something else, then the rollback function can be invoked to
// restore the previous state. // restore the previous state.
func (alloc *allocator) allocateDevice(r deviceIndices, device *draapi.BasicDevice, deviceID DeviceID, must bool) (bool, func(), error) { func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, must bool) (bool, func(), error) {
claim := alloc.claimsToAllocate[r.claimIndex] claim := alloc.claimsToAllocate[r.claimIndex]
request := &claim.Spec.Devices.Requests[r.requestIndex] request := &claim.Spec.Devices.Requests[r.requestIndex]
adminAccess := ptr.Deref(request.AdminAccess, false) adminAccess := ptr.Deref(request.AdminAccess, false)
if !adminAccess && alloc.allocated[deviceID] { if !adminAccess && alloc.allocated[device.id] {
alloc.logger.V(7).Info("Device in use", "device", deviceID) alloc.logger.V(7).Info("Device in use", "device", device.id)
return false, nil, nil return false, nil, nil
} }
// It's available. Now check constraints. // It's available. Now check constraints.
for i, constraint := range alloc.constraints[r.claimIndex] { for i, constraint := range alloc.constraints[r.claimIndex] {
added := constraint.add(request.Name, device, deviceID) added := constraint.add(request.Name, device.basic, device.id)
if !added { if !added {
if must { if must {
// It does not make sense to declare a claim where a constraint prevents getting // It does not make sense to declare a claim where a constraint prevents getting
// all devices. Treat this as an error. // all devices. Treat this as an error.
return false, nil, fmt.Errorf("claim %s, request %s: cannot add device %s because a claim constraint would not be satisfied", klog.KObj(claim), request.Name, deviceID) return false, nil, fmt.Errorf("claim %s, request %s: cannot add device %s because a claim constraint would not be satisfied", klog.KObj(claim), request.Name, device.id)
} }
// Roll back for all previous constraints before we return. // Roll back for all previous constraints before we return.
for e := 0; e < i; e++ { for e := 0; e < i; e++ {
alloc.constraints[r.claimIndex][e].remove(request.Name, device, deviceID) alloc.constraints[r.claimIndex][e].remove(request.Name, device.basic, device.id)
} }
return false, nil, nil return false, nil, nil
} }
@ -745,49 +775,45 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device *draapi.BasicDevi
// All constraints satisfied. Mark as in use (unless we do admin access) // All constraints satisfied. Mark as in use (unless we do admin access)
// and record the result. // and record the result.
alloc.logger.V(7).Info("Device allocated", "device", deviceID) alloc.logger.V(7).Info("Device allocated", "device", device.id)
if !adminAccess { if !adminAccess {
alloc.allocated[deviceID] = true alloc.allocated[device.id] = true
} }
result := resourceapi.DeviceRequestAllocationResult{ result := internalDeviceResult{
Request: request.Name, request: request.Name,
Driver: deviceID.Driver.String(), id: device.id,
Pool: deviceID.Pool.String(), slice: device.slice,
Device: deviceID.Device.String(),
} }
if adminAccess { if adminAccess {
result.AdminAccess = &adminAccess result.adminAccess = &adminAccess
} }
previousNumResults := len(alloc.result[r.claimIndex].Devices.Results) previousNumResults := len(alloc.result[r.claimIndex].devices)
alloc.result[r.claimIndex].Devices.Results = append(alloc.result[r.claimIndex].Devices.Results, result) alloc.result[r.claimIndex].devices = append(alloc.result[r.claimIndex].devices, result)
return true, func() { return true, func() {
for _, constraint := range alloc.constraints[r.claimIndex] { for _, constraint := range alloc.constraints[r.claimIndex] {
constraint.remove(request.Name, device, deviceID) constraint.remove(request.Name, device.basic, device.id)
} }
if !adminAccess { if !adminAccess {
alloc.allocated[deviceID] = false alloc.allocated[device.id] = false
} }
// Truncate, but keep the underlying slice. // Truncate, but keep the underlying slice.
alloc.result[r.claimIndex].Devices.Results = alloc.result[r.claimIndex].Devices.Results[:previousNumResults] alloc.result[r.claimIndex].devices = alloc.result[r.claimIndex].devices[:previousNumResults]
alloc.logger.V(7).Info("Device deallocated", "device", deviceID) alloc.logger.V(7).Info("Device deallocated", "device", device.id)
}, nil }, nil
} }
// createNodeSelector constructs a node selector for the allocation, if needed, // createNodeSelector constructs a node selector for the allocation, if needed,
// otherwise it returns nil. // otherwise it returns nil.
func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationResult) (*v1.NodeSelector, error) { func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.NodeSelector, error) {
// Selector with one term. That term gets extended with additional // Selector with one term. That term gets extended with additional
// requirements from the different devices. // requirements from the different devices.
nodeSelector := &v1.NodeSelector{ nodeSelector := &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{{}}, NodeSelectorTerms: []v1.NodeSelectorTerm{{}},
} }
for _, deviceAllocation := range allocation.Devices.Results { for i := range result {
slice := alloc.findSlice(deviceAllocation) slice := result[i].slice
if slice == nil {
return nil, fmt.Errorf("internal error: device %+v not found in pools", deviceAllocation)
}
if slice.Spec.NodeName != "" { if slice.Spec.NodeName != "" {
// At least one device is local to one node. This // At least one device is local to one node. This
// restricts the allocation to that node. // restricts the allocation to that node.
@ -825,26 +851,6 @@ func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationRes
return nil, nil return nil, nil
} }
func (alloc *allocator) findSlice(deviceAllocation resourceapi.DeviceRequestAllocationResult) *draapi.ResourceSlice {
driverName := draapi.MakeUniqueString(deviceAllocation.Driver)
poolName := draapi.MakeUniqueString(deviceAllocation.Pool)
deviceName := draapi.MakeUniqueString(deviceAllocation.Device)
for _, pool := range alloc.pools {
if pool.Driver != driverName ||
pool.Pool != poolName {
continue
}
for _, slice := range pool.Slices {
for _, device := range slice.Spec.Devices {
if device.Name == deviceName {
return slice
}
}
}
}
return nil
}
func addNewNodeSelectorRequirements(from []v1.NodeSelectorRequirement, to *[]v1.NodeSelectorRequirement) { func addNewNodeSelectorRequirements(from []v1.NodeSelectorRequirement, to *[]v1.NodeSelectorRequirement) {
for _, requirement := range from { for _, requirement := range from {
if !containsNodeSelectorRequirement(*to, requirement) { if !containsNodeSelectorRequirement(*to, requirement) {

View File

@ -266,13 +266,13 @@ func localNodeSelector(nodeName string) *v1.NodeSelector {
// allocationResult returns a matcher for one AllocationResult pointer with a list of // allocationResult returns a matcher for one AllocationResult pointer with a list of
// embedded device allocation results. The order of those results doesn't matter. // embedded device allocation results. The order of those results doesn't matter.
func allocationResult(selector *v1.NodeSelector, results ...resourceapi.DeviceRequestAllocationResult) types.GomegaMatcher { func allocationResult(selector *v1.NodeSelector, results ...resourceapi.DeviceRequestAllocationResult) types.GomegaMatcher {
return gstruct.PointTo(gstruct.MatchFields(0, gstruct.Fields{ return gstruct.MatchFields(0, gstruct.Fields{
"Devices": gstruct.MatchFields(0, gstruct.Fields{ "Devices": gstruct.MatchFields(0, gstruct.Fields{
"Results": gomega.ConsistOf(results), // Order is irrelevant. "Results": gomega.ConsistOf(results), // Order is irrelevant.
"Config": gomega.BeNil(), "Config": gomega.BeNil(),
}), }),
"NodeSelector": matchNodeSelector(selector), "NodeSelector": matchNodeSelector(selector),
})) })
} }
// matchNodeSelector returns a matcher for a node selector. The order // matchNodeSelector returns a matcher for a node selector. The order
@ -315,8 +315,8 @@ func matchNodeSelectorRequirement(requirement v1.NodeSelectorRequirement) types.
}) })
} }
func allocationResultWithConfig(selector *v1.NodeSelector, driver string, source resourceapi.AllocationConfigSource, attribute string, results ...resourceapi.DeviceRequestAllocationResult) *resourceapi.AllocationResult { func allocationResultWithConfig(selector *v1.NodeSelector, driver string, source resourceapi.AllocationConfigSource, attribute string, results ...resourceapi.DeviceRequestAllocationResult) resourceapi.AllocationResult {
return &resourceapi.AllocationResult{ return resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{ Devices: resourceapi.DeviceAllocationResult{
Results: results, Results: results,
Config: []resourceapi.DeviceAllocationConfiguration{ Config: []resourceapi.DeviceAllocationConfiguration{

View File

@ -330,7 +330,7 @@ claims:
tCtx.ExpectNoError(err, "allocate claim") tCtx.ExpectNoError(err, "allocate claim")
if result != nil { if result != nil {
claim = claim.DeepCopy() claim = claim.DeepCopy()
claim.Status.Allocation = result[0] claim.Status.Allocation = &result[0]
claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{}) claim, err := tCtx.Client().ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(tCtx, claim, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "update claim status with allocation") tCtx.ExpectNoError(err, "update claim status with allocation")
tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim") tCtx.ExpectNoError(claimCache.Assume(claim), "assume claim")