DRA scheduler: ResourceSlice with unique strings

Using unique strings instead of normal strings speeds up allocation with
structured parameters because maps that use those strings as key no longer need
to build hashes of the string content. However, care must be taken to call
unique.Make as little as possible because it is costly.

Pre-allocating the map of allocated devices reduces the need to grow the map
when adding devices.

    goos: linux
    goarch: amd64
    pkg: k8s.io/kubernetes/test/integration/scheduler_perf
    cpu: Intel(R) Core(TM) i9-7980XE CPU @ 2.60GHz
                                                                                       │            before            │                        after                         │
                                                                                       │ SchedulingThroughput/Average │ SchedulingThroughput/Average  vs base                │
    PerfScheduling/SchedulingWithResourceClaimTemplateStructured/5000pods_500nodes-36                     18.06 ±  2%                     33.30 ± 2%   +84.31% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_100nodes-36                    104.7 ±  2%                     105.3 ± 2%         ~ (p=0.818 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/empty_500nodes-36                    96.62 ±  1%                    100.75 ± 1%    +4.28% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_100nodes-36                     83.00 ±  2%                     90.96 ± 2%    +9.59% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/half_500nodes-36                     32.45 ±  7%                     49.84 ± 4%   +53.60% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_100nodes-36                     95.22 ±  7%                    103.80 ± 1%    +9.00% (p=0.002 n=6)
    PerfScheduling/SteadyStateClusterResourceClaimTemplateStructured/full_500nodes-36                     9.111 ± 10%                    27.215 ± 7%  +198.69% (p=0.002 n=6)
    geomean                                                                                               45.86                           64.26        +40.12%
This commit is contained in:
Patrick Ohly
2024-09-04 14:34:35 +02:00
parent 7de6d070f2
commit 1246898315
10 changed files with 621 additions and 196 deletions

View File

@@ -30,6 +30,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
@@ -45,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/utils/ptr"
)
const (
@@ -530,12 +532,15 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
// Claims are treated as "allocated" if they are in the assume cache
// or currently their allocation is in-flight. This does not change
// during filtering, so we can determine that once.
claimLister := &claimListerForAssumeCache{assumeCache: pl.claimAssumeCache, inFlightAllocations: &pl.inFlightAllocations}
allocatedClaims, err := claimLister.ListAllAllocated()
allocatedDevices := pl.listAllAllocatedDevices()
if err != nil {
return nil, statusError(logger, err)
}
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, staticClaimLister(allocatedClaims), pl.classLister, pl.sliceLister)
slices, err := pl.sliceLister.List(labels.Everything())
if err != nil {
return nil, statusError(logger, err)
}
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allocatedDevices, pl.classLister, slices)
if err != nil {
return nil, statusError(logger, err)
}
@@ -547,31 +552,33 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, nil
}
type claimListerForAssumeCache struct {
assumeCache *assumecache.AssumeCache
inFlightAllocations *sync.Map
}
func (cl *claimListerForAssumeCache) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
func (pl *DynamicResources) listAllAllocatedDevices() []structured.DeviceID {
// Probably not worth adding an index for?
objs := cl.assumeCache.List(nil)
allocated := make([]*resourceapi.ResourceClaim, 0, len(objs))
objs := pl.claimAssumeCache.List(nil)
var allocated []structured.DeviceID
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if obj, ok := cl.inFlightAllocations.Load(claim.UID); ok {
if obj, ok := pl.inFlightAllocations.Load(claim.UID); ok {
claim = obj.(*resourceapi.ResourceClaim)
}
if claim.Status.Allocation != nil {
allocated = append(allocated, claim)
if claim.Status.Allocation == nil {
continue
}
for _, result := range claim.Status.Allocation.Devices.Results {
// Kubernetes 1.31 did not set this, 1.32 always does.
// Supporting 1.31 is not worth the additional code that
// would have to be written (= looking up in request) because
// it is extremely unlikely that there really is a result
// that still exists in a cluster from 1.31 where this matters.
if ptr.Deref(result.AdminAccess, false) {
// Is not considered as allocated.
continue
}
deviceID := structured.MakeDeviceID(result.Driver, result.Pool, result.Device)
allocated = append(allocated, deviceID)
}
}
return allocated, nil
}
type staticClaimLister []*resourceapi.ResourceClaim
func (cl staticClaimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
return cl, nil
return allocated
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.

View File

@@ -0,0 +1,39 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"unique"
conversion "k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
)
var (
localSchemeBuilder runtime.SchemeBuilder
AddToScheme = localSchemeBuilder.AddToScheme
)
func Convert_api_UniqueString_To_string(in *UniqueString, out *string, s conversion.Scope) error {
*out = in.String()
return nil
}
func Convert_string_To_api_UniqueString(in *string, out *UniqueString, s conversion.Scope) error {
*out = UniqueString(unique.Make(*in))
return nil
}

View File

@@ -0,0 +1,22 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package api contains a variant of the API where strings are unique. These
// unique strings are faster to compare and more efficient when used as key in
// a map.
//
// +k8s:conversion-gen=k8s.io/api/resource/v1alpha3
package api

View File

@@ -0,0 +1,64 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type ResourceSlice struct {
metav1.TypeMeta
metav1.ObjectMeta
Spec ResourceSliceSpec
}
type ResourceSliceSpec struct {
Driver UniqueString
Pool ResourcePool
NodeName string
NodeSelector *v1.NodeSelector
AllNodes bool
Devices []Device
}
type ResourcePool struct {
Name UniqueString
Generation int64
ResourceSliceCount int64
}
type Device struct {
Name UniqueString
Basic *BasicDevice
}
type BasicDevice struct {
Attributes map[QualifiedName]DeviceAttribute
Capacity map[QualifiedName]resource.Quantity
}
type QualifiedName string
type FullyQualifiedName string
type DeviceAttribute struct {
IntValue *int64
BoolValue *bool
StringValue *string
VersionValue *string
}

View File

@@ -0,0 +1,41 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"unique"
)
// NullUniqueString is a UniqueString which contains no string.
var NullUniqueString UniqueString
// UniqueString is a wrapper around [unique.Handle[string]].
type UniqueString unique.Handle[string]
// Returns the string that is stored in the UniqueString.
// If the UniqueString is null, the empty string is returned.
func (us UniqueString) String() string {
if us == NullUniqueString {
return ""
}
return unique.Handle[string](us).Value()
}
// MakeUniqueString constructs a new unique string.
func MakeUniqueString(str string) UniqueString {
return UniqueString(unique.Make(str))
}

View File

@@ -0,0 +1,298 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by conversion-gen. DO NOT EDIT.
package api
import (
unsafe "unsafe"
v1 "k8s.io/api/core/v1"
v1alpha3 "k8s.io/api/resource/v1alpha3"
resource "k8s.io/apimachinery/pkg/api/resource"
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
)
func init() {
localSchemeBuilder.Register(RegisterConversions)
}
// RegisterConversions adds conversion functions to the given scheme.
// Public to allow building arbitrary schemes.
func RegisterConversions(s *runtime.Scheme) error {
if err := s.AddGeneratedConversionFunc((*BasicDevice)(nil), (*v1alpha3.BasicDevice)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_BasicDevice_To_v1alpha3_BasicDevice(a.(*BasicDevice), b.(*v1alpha3.BasicDevice), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.BasicDevice)(nil), (*BasicDevice)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_BasicDevice_To_api_BasicDevice(a.(*v1alpha3.BasicDevice), b.(*BasicDevice), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*Device)(nil), (*v1alpha3.Device)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_Device_To_v1alpha3_Device(a.(*Device), b.(*v1alpha3.Device), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.Device)(nil), (*Device)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_Device_To_api_Device(a.(*v1alpha3.Device), b.(*Device), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*DeviceAttribute)(nil), (*v1alpha3.DeviceAttribute)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(a.(*DeviceAttribute), b.(*v1alpha3.DeviceAttribute), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.DeviceAttribute)(nil), (*DeviceAttribute)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(a.(*v1alpha3.DeviceAttribute), b.(*DeviceAttribute), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ResourcePool)(nil), (*v1alpha3.ResourcePool)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_ResourcePool_To_v1alpha3_ResourcePool(a.(*ResourcePool), b.(*v1alpha3.ResourcePool), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourcePool)(nil), (*ResourcePool)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_ResourcePool_To_api_ResourcePool(a.(*v1alpha3.ResourcePool), b.(*ResourcePool), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ResourceSlice)(nil), (*v1alpha3.ResourceSlice)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice(a.(*ResourceSlice), b.(*v1alpha3.ResourceSlice), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourceSlice)(nil), (*ResourceSlice)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(a.(*v1alpha3.ResourceSlice), b.(*ResourceSlice), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*ResourceSliceSpec)(nil), (*v1alpha3.ResourceSliceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(a.(*ResourceSliceSpec), b.(*v1alpha3.ResourceSliceSpec), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1alpha3.ResourceSliceSpec)(nil), (*ResourceSliceSpec)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(a.(*v1alpha3.ResourceSliceSpec), b.(*ResourceSliceSpec), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*UniqueString)(nil), (*string)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_api_UniqueString_To_string(a.(*UniqueString), b.(*string), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*string)(nil), (*UniqueString)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_string_To_api_UniqueString(a.(*string), b.(*UniqueString), scope)
}); err != nil {
return err
}
return nil
}
func autoConvert_api_BasicDevice_To_v1alpha3_BasicDevice(in *BasicDevice, out *v1alpha3.BasicDevice, s conversion.Scope) error {
out.Attributes = *(*map[v1alpha3.QualifiedName]v1alpha3.DeviceAttribute)(unsafe.Pointer(&in.Attributes))
out.Capacity = *(*map[v1alpha3.QualifiedName]resource.Quantity)(unsafe.Pointer(&in.Capacity))
return nil
}
// Convert_api_BasicDevice_To_v1alpha3_BasicDevice is an autogenerated conversion function.
func Convert_api_BasicDevice_To_v1alpha3_BasicDevice(in *BasicDevice, out *v1alpha3.BasicDevice, s conversion.Scope) error {
return autoConvert_api_BasicDevice_To_v1alpha3_BasicDevice(in, out, s)
}
func autoConvert_v1alpha3_BasicDevice_To_api_BasicDevice(in *v1alpha3.BasicDevice, out *BasicDevice, s conversion.Scope) error {
out.Attributes = *(*map[QualifiedName]DeviceAttribute)(unsafe.Pointer(&in.Attributes))
out.Capacity = *(*map[QualifiedName]resource.Quantity)(unsafe.Pointer(&in.Capacity))
return nil
}
// Convert_v1alpha3_BasicDevice_To_api_BasicDevice is an autogenerated conversion function.
func Convert_v1alpha3_BasicDevice_To_api_BasicDevice(in *v1alpha3.BasicDevice, out *BasicDevice, s conversion.Scope) error {
return autoConvert_v1alpha3_BasicDevice_To_api_BasicDevice(in, out, s)
}
func autoConvert_api_Device_To_v1alpha3_Device(in *Device, out *v1alpha3.Device, s conversion.Scope) error {
if err := Convert_api_UniqueString_To_string(&in.Name, &out.Name, s); err != nil {
return err
}
out.Basic = (*v1alpha3.BasicDevice)(unsafe.Pointer(in.Basic))
return nil
}
// Convert_api_Device_To_v1alpha3_Device is an autogenerated conversion function.
func Convert_api_Device_To_v1alpha3_Device(in *Device, out *v1alpha3.Device, s conversion.Scope) error {
return autoConvert_api_Device_To_v1alpha3_Device(in, out, s)
}
func autoConvert_v1alpha3_Device_To_api_Device(in *v1alpha3.Device, out *Device, s conversion.Scope) error {
if err := Convert_string_To_api_UniqueString(&in.Name, &out.Name, s); err != nil {
return err
}
out.Basic = (*BasicDevice)(unsafe.Pointer(in.Basic))
return nil
}
// Convert_v1alpha3_Device_To_api_Device is an autogenerated conversion function.
func Convert_v1alpha3_Device_To_api_Device(in *v1alpha3.Device, out *Device, s conversion.Scope) error {
return autoConvert_v1alpha3_Device_To_api_Device(in, out, s)
}
func autoConvert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in *DeviceAttribute, out *v1alpha3.DeviceAttribute, s conversion.Scope) error {
out.IntValue = (*int64)(unsafe.Pointer(in.IntValue))
out.BoolValue = (*bool)(unsafe.Pointer(in.BoolValue))
out.StringValue = (*string)(unsafe.Pointer(in.StringValue))
out.VersionValue = (*string)(unsafe.Pointer(in.VersionValue))
return nil
}
// Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute is an autogenerated conversion function.
func Convert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in *DeviceAttribute, out *v1alpha3.DeviceAttribute, s conversion.Scope) error {
return autoConvert_api_DeviceAttribute_To_v1alpha3_DeviceAttribute(in, out, s)
}
func autoConvert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in *v1alpha3.DeviceAttribute, out *DeviceAttribute, s conversion.Scope) error {
out.IntValue = (*int64)(unsafe.Pointer(in.IntValue))
out.BoolValue = (*bool)(unsafe.Pointer(in.BoolValue))
out.StringValue = (*string)(unsafe.Pointer(in.StringValue))
out.VersionValue = (*string)(unsafe.Pointer(in.VersionValue))
return nil
}
// Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute is an autogenerated conversion function.
func Convert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in *v1alpha3.DeviceAttribute, out *DeviceAttribute, s conversion.Scope) error {
return autoConvert_v1alpha3_DeviceAttribute_To_api_DeviceAttribute(in, out, s)
}
func autoConvert_api_ResourcePool_To_v1alpha3_ResourcePool(in *ResourcePool, out *v1alpha3.ResourcePool, s conversion.Scope) error {
if err := Convert_api_UniqueString_To_string(&in.Name, &out.Name, s); err != nil {
return err
}
out.Generation = in.Generation
out.ResourceSliceCount = in.ResourceSliceCount
return nil
}
// Convert_api_ResourcePool_To_v1alpha3_ResourcePool is an autogenerated conversion function.
func Convert_api_ResourcePool_To_v1alpha3_ResourcePool(in *ResourcePool, out *v1alpha3.ResourcePool, s conversion.Scope) error {
return autoConvert_api_ResourcePool_To_v1alpha3_ResourcePool(in, out, s)
}
func autoConvert_v1alpha3_ResourcePool_To_api_ResourcePool(in *v1alpha3.ResourcePool, out *ResourcePool, s conversion.Scope) error {
if err := Convert_string_To_api_UniqueString(&in.Name, &out.Name, s); err != nil {
return err
}
out.Generation = in.Generation
out.ResourceSliceCount = in.ResourceSliceCount
return nil
}
// Convert_v1alpha3_ResourcePool_To_api_ResourcePool is an autogenerated conversion function.
func Convert_v1alpha3_ResourcePool_To_api_ResourcePool(in *v1alpha3.ResourcePool, out *ResourcePool, s conversion.Scope) error {
return autoConvert_v1alpha3_ResourcePool_To_api_ResourcePool(in, out, s)
}
func autoConvert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in *ResourceSlice, out *v1alpha3.ResourceSlice, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(&in.Spec, &out.Spec, s); err != nil {
return err
}
return nil
}
// Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice is an autogenerated conversion function.
func Convert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in *ResourceSlice, out *v1alpha3.ResourceSlice, s conversion.Scope) error {
return autoConvert_api_ResourceSlice_To_v1alpha3_ResourceSlice(in, out, s)
}
func autoConvert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in *v1alpha3.ResourceSlice, out *ResourceSlice, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(&in.Spec, &out.Spec, s); err != nil {
return err
}
return nil
}
// Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice is an autogenerated conversion function.
func Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in *v1alpha3.ResourceSlice, out *ResourceSlice, s conversion.Scope) error {
return autoConvert_v1alpha3_ResourceSlice_To_api_ResourceSlice(in, out, s)
}
func autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *ResourceSliceSpec, out *v1alpha3.ResourceSliceSpec, s conversion.Scope) error {
if err := Convert_api_UniqueString_To_string(&in.Driver, &out.Driver, s); err != nil {
return err
}
if err := Convert_api_ResourcePool_To_v1alpha3_ResourcePool(&in.Pool, &out.Pool, s); err != nil {
return err
}
out.NodeName = in.NodeName
out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector))
out.AllNodes = in.AllNodes
if in.Devices != nil {
in, out := &in.Devices, &out.Devices
*out = make([]v1alpha3.Device, len(*in))
for i := range *in {
if err := Convert_api_Device_To_v1alpha3_Device(&(*in)[i], &(*out)[i], s); err != nil {
return err
}
}
} else {
out.Devices = nil
}
return nil
}
// Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec is an autogenerated conversion function.
func Convert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in *ResourceSliceSpec, out *v1alpha3.ResourceSliceSpec, s conversion.Scope) error {
return autoConvert_api_ResourceSliceSpec_To_v1alpha3_ResourceSliceSpec(in, out, s)
}
func autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha3.ResourceSliceSpec, out *ResourceSliceSpec, s conversion.Scope) error {
if err := Convert_string_To_api_UniqueString(&in.Driver, &out.Driver, s); err != nil {
return err
}
if err := Convert_v1alpha3_ResourcePool_To_api_ResourcePool(&in.Pool, &out.Pool, s); err != nil {
return err
}
out.NodeName = in.NodeName
out.NodeSelector = (*v1.NodeSelector)(unsafe.Pointer(in.NodeSelector))
out.AllNodes = in.AllNodes
if in.Devices != nil {
in, out := &in.Devices, &out.Devices
*out = make([]Device, len(*in))
for i := range *in {
if err := Convert_v1alpha3_Device_To_api_Device(&(*in)[i], &(*out)[i], s); err != nil {
return err
}
}
} else {
out.Devices = nil
}
return nil
}
// Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec is an autogenerated conversion function.
func Convert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in *v1alpha3.ResourceSliceSpec, out *ResourceSliceSpec, s conversion.Scope) error {
return autoConvert_v1alpha3_ResourceSliceSpec_To_api_ResourceSliceSpec(in, out, s)
}

View File

@@ -27,18 +27,12 @@ import (
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/util/sets"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/dynamic-resource-allocation/cel"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
// ClaimLister returns a subset of the claims that a
// resourcelisters.ResourceClaimLister would return.
type ClaimLister interface {
// ListAllAllocated returns only claims which are allocated.
ListAllAllocated() ([]*resourceapi.ResourceClaim, error)
}
// Allocator calculates how to allocate a set of unallocated claims which use
// structured parameters.
//
@@ -48,26 +42,29 @@ type ClaimLister interface {
type Allocator struct {
adminAccessEnabled bool
claimsToAllocate []*resourceapi.ResourceClaim
claimLister ClaimLister
allocatedDevices []DeviceID
classLister resourcelisters.DeviceClassLister
sliceLister resourcelisters.ResourceSliceLister
slices []*resourceapi.ResourceSlice
}
// NewAllocator returns an allocator for a certain set of claims or an error if
// some problem was detected which makes it impossible to allocate claims.
//
// The returned Allocator is stateless. It calls the listers anew for each
// Allocate call.
func NewAllocator(ctx context.Context,
adminAccessEnabled bool,
claimsToAllocate []*resourceapi.ResourceClaim,
claimLister ClaimLister,
allocatedDevices []DeviceID,
classLister resourcelisters.DeviceClassLister,
sliceLister resourcelisters.ResourceSliceLister,
slices []*resourceapi.ResourceSlice,
) (*Allocator, error) {
return &Allocator{
adminAccessEnabled: adminAccessEnabled,
claimsToAllocate: claimsToAllocate,
claimLister: claimLister,
allocatedDevices: allocatedDevices,
classLister: classLister,
sliceLister: sliceLister,
slices: slices,
}, nil
}
@@ -107,14 +104,13 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
deviceMatchesRequest: make(map[matchKey]bool),
constraints: make([][]constraint, len(a.claimsToAllocate)),
requestData: make(map[requestIndices]requestData),
allocated: make(map[DeviceID]bool),
result: make([]*resourceapi.AllocationResult, len(a.claimsToAllocate)),
}
alloc.logger.V(5).Info("Starting allocation", "numClaims", len(alloc.claimsToAllocate))
defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr)
// First determine all eligible pools.
pools, err := GatherPools(ctx, alloc.sliceLister, node)
pools, err := GatherPools(ctx, alloc.slices, node)
if err != nil {
return nil, fmt.Errorf("gather pool information: %w", err)
}
@@ -144,8 +140,9 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
// and their requests. For each claim we determine how many devices
// need to be allocated. If not all can be stored in the result, the
// claim cannot be allocated.
numDevicesTotal := 0
for claimIndex, claim := range alloc.claimsToAllocate {
numDevices := 0
numDevicesPerClaim := 0
// If we have any any request that wants "all" devices, we need to
// figure out how much "all" is. If some pool is incomplete, we stop
@@ -220,20 +217,20 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
return nil, fmt.Errorf("claim %s, request %s: unsupported count mode %s", klog.KObj(claim), request.Name, request.AllocationMode)
}
alloc.requestData[requestKey] = requestData
numDevices += requestData.numDevices
numDevicesPerClaim += requestData.numDevices
}
alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevices)
alloc.logger.V(6).Info("Checked claim", "claim", klog.KObj(claim), "numDevices", numDevicesPerClaim)
// Check that we don't end up with too many results.
if numDevices > resourceapi.AllocationResultsMaxSize {
return nil, fmt.Errorf("claim %s: number of requested devices %d exceeds the claim limit of %d", klog.KObj(claim), numDevices, resourceapi.AllocationResultsMaxSize)
if numDevicesPerClaim > resourceapi.AllocationResultsMaxSize {
return nil, fmt.Errorf("claim %s: number of requested devices %d exceeds the claim limit of %d", klog.KObj(claim), numDevicesPerClaim, resourceapi.AllocationResultsMaxSize)
}
// If we don't, then we can pre-allocate the result slices for
// appending the actual results later.
alloc.result[claimIndex] = &resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{
Results: make([]resourceapi.DeviceRequestAllocationResult, 0, numDevices),
Results: make([]resourceapi.DeviceRequestAllocationResult, 0, numDevicesPerClaim),
},
}
@@ -244,15 +241,16 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
for i, constraint := range claim.Spec.Devices.Constraints {
switch {
case constraint.MatchAttribute != nil:
matchAttribute := draapi.FullyQualifiedName(*constraint.MatchAttribute)
logger := alloc.logger
if loggerV := alloc.logger.V(6); loggerV.Enabled() {
logger = klog.LoggerWithName(logger, "matchAttributeConstraint")
logger = klog.LoggerWithValues(logger, "matchAttribute", *constraint.MatchAttribute)
logger = klog.LoggerWithValues(logger, "matchAttribute", matchAttribute)
}
m := &matchAttributeConstraint{
logger: logger,
requestNames: sets.New(constraint.Requests...),
attributeName: *constraint.MatchAttribute,
attributeName: matchAttribute,
}
constraints[i] = m
default:
@@ -261,6 +259,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
}
}
alloc.constraints[claimIndex] = constraints
numDevicesTotal += numDevicesPerClaim
}
// Selecting a device for a request is independent of what has been
@@ -272,30 +271,14 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
// Some of the existing devices are probably already allocated by
// claims...
claims, err := alloc.claimLister.ListAllAllocated()
numAllocated := 0
if err != nil {
return nil, fmt.Errorf("list allocated claims: %w", err)
}
for _, claim := range claims {
// Sanity check..
if claim.Status.Allocation == nil {
continue
}
for _, result := range claim.Status.Allocation.Devices.Results {
// Kubernetes 1.31 did not set this, 1.32 always does.
// Supporting 1.31 is not worth the additional code that
// would have to be written (= looking up in request) because
// it is extremely unlikely that there really is a result
// that still exists in a cluster from 1.31 where this matters.
if ptr.Deref(result.AdminAccess, false) {
// Ignore, it's not considered allocated.
continue
}
deviceID := DeviceID{Driver: result.Driver, Pool: result.Pool, Device: result.Device}
alloc.allocated[deviceID] = true
numAllocated++
}
numAllocated := len(alloc.allocatedDevices)
// Pre-allocating the map avoids growing it later. We can estimate
// the size based on what is already allocated and what we need to
// allocate.
alloc.allocated = make(map[DeviceID]bool, numAllocated+numDevicesTotal)
for _, deviceID := range alloc.allocatedDevices {
alloc.allocated[deviceID] = true
}
alloc.logger.V(6).Info("Gathered information about allocated devices", "numAllocated", numAllocated)
@@ -404,18 +387,18 @@ type requestData struct {
type deviceWithID struct {
DeviceID
device *resourceapi.BasicDevice
device *draapi.BasicDevice
}
type constraint interface {
// add is called whenever a device is about to be allocated. It must
// check whether the device matches the constraint and if yes,
// track that it is allocated.
add(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) bool
add(requestName string, device *draapi.BasicDevice, deviceID DeviceID) bool
// For every successful add there is exactly one matching removed call
// with the exact same parameters.
remove(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID)
remove(requestName string, device *draapi.BasicDevice, deviceID DeviceID)
}
// matchAttributeConstraint compares an attribute value across devices.
@@ -428,13 +411,13 @@ type constraint interface {
type matchAttributeConstraint struct {
logger klog.Logger // Includes name and attribute name, so no need to repeat in log messages.
requestNames sets.Set[string]
attributeName resourceapi.FullyQualifiedName
attributeName draapi.FullyQualifiedName
attribute *resourceapi.DeviceAttribute
attribute *draapi.DeviceAttribute
numDevices int
}
func (m *matchAttributeConstraint) add(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) bool {
func (m *matchAttributeConstraint) add(requestName string, device *draapi.BasicDevice, deviceID DeviceID) bool {
if m.requestNames.Len() > 0 && !m.requestNames.Has(requestName) {
// Device not affected by constraint.
m.logger.V(7).Info("Constraint does not apply to request", "request", requestName)
@@ -491,7 +474,7 @@ func (m *matchAttributeConstraint) add(requestName string, device *resourceapi.B
return true
}
func (m *matchAttributeConstraint) remove(requestName string, device *resourceapi.BasicDevice, deviceID DeviceID) {
func (m *matchAttributeConstraint) remove(requestName string, device *draapi.BasicDevice, deviceID DeviceID) {
if m.requestNames.Len() > 0 && !m.requestNames.Has(requestName) {
// Device not affected by constraint.
return
@@ -501,9 +484,9 @@ func (m *matchAttributeConstraint) remove(requestName string, device *resourceap
m.logger.V(7).Info("Device removed from constraint set", "device", deviceID, "numDevices", m.numDevices)
}
func lookupAttribute(device *resourceapi.BasicDevice, deviceID DeviceID, attributeName resourceapi.FullyQualifiedName) *resourceapi.DeviceAttribute {
func lookupAttribute(device *draapi.BasicDevice, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute {
// Fully-qualified match?
if attr, ok := device.Attributes[resourceapi.QualifiedName(attributeName)]; ok {
if attr, ok := device.Attributes[draapi.QualifiedName(attributeName)]; ok {
return &attr
}
index := strings.Index(string(attributeName), "/")
@@ -512,14 +495,14 @@ func lookupAttribute(device *resourceapi.BasicDevice, deviceID DeviceID, attribu
return nil
}
if string(attributeName[0:index]) != deviceID.Driver {
if string(attributeName[0:index]) != deviceID.Driver.String() {
// Not an attribute of the driver and not found above,
// so it is not available.
return nil
}
// Domain matches the driver, so let's check just the ID.
if attr, ok := device.Attributes[resourceapi.QualifiedName(attributeName[index+1:])]; ok {
if attr, ok := device.Attributes[draapi.QualifiedName(attributeName[index+1:])]; ok {
return &attr
}
@@ -640,7 +623,7 @@ func (alloc *allocator) allocateOne(r deviceIndices) (bool, error) {
}
// isSelectable checks whether a device satisfies the request and class selectors.
func (alloc *allocator) isSelectable(r requestIndices, slice *resourceapi.ResourceSlice, deviceIndex int) (bool, error) {
func (alloc *allocator) isSelectable(r requestIndices, slice *draapi.ResourceSlice, deviceIndex int) (bool, error) {
// This is the only supported device type at the moment.
device := slice.Spec.Devices[deviceIndex].Basic
if device == nil {
@@ -682,7 +665,7 @@ func (alloc *allocator) isSelectable(r requestIndices, slice *resourceapi.Resour
}
func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
for i, selector := range selectors {
expr := cel.GetCompiler().CompileCELExpression(selector.CEL.Expression, cel.Options{})
if expr.Error != nil {
@@ -697,7 +680,13 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.Bas
return false, fmt.Errorf("claim %s: selector #%d: CEL compile error: %w", klog.KObj(alloc.claimsToAllocate[r.claimIndex]), i, expr.Error)
}
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver, Attributes: device.Attributes, Capacity: device.Capacity})
// If this conversion turns out to be expensive, the CEL package could be converted
// to use unique strings.
var d resourceapi.BasicDevice
if err := draapi.Convert_api_BasicDevice_To_v1alpha3_BasicDevice(device, &d, nil); err != nil {
return false, fmt.Errorf("convert BasicDevice: %w", err)
}
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: d.Attributes, Capacity: d.Capacity})
if class != nil {
alloc.logger.V(7).Info("CEL result", "device", deviceID, "class", klog.KObj(class), "selector", i, "expression", selector.CEL.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err)
} else {
@@ -727,7 +716,7 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *resourceapi.Bas
// as if that candidate had been allocated. If allocation cannot continue later
// and must try something else, then the rollback function can be invoked to
// restore the previous state.
func (alloc *allocator) allocateDevice(r deviceIndices, device *resourceapi.BasicDevice, deviceID DeviceID, must bool) (bool, func(), error) {
func (alloc *allocator) allocateDevice(r deviceIndices, device *draapi.BasicDevice, deviceID DeviceID, must bool) (bool, func(), error) {
claim := alloc.claimsToAllocate[r.claimIndex]
request := &claim.Spec.Devices.Requests[r.requestIndex]
adminAccess := ptr.Deref(request.AdminAccess, false)
@@ -762,9 +751,9 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device *resourceapi.Basi
}
result := resourceapi.DeviceRequestAllocationResult{
Request: request.Name,
Driver: deviceID.Driver,
Pool: deviceID.Pool,
Device: deviceID.Device,
Driver: deviceID.Driver.String(),
Pool: deviceID.Pool.String(),
Device: deviceID.Device.String(),
}
if adminAccess {
result.AdminAccess = &adminAccess
@@ -836,15 +825,18 @@ func (alloc *allocator) createNodeSelector(allocation *resourceapi.AllocationRes
return nil, nil
}
func (alloc *allocator) findSlice(deviceAllocation resourceapi.DeviceRequestAllocationResult) *resourceapi.ResourceSlice {
func (alloc *allocator) findSlice(deviceAllocation resourceapi.DeviceRequestAllocationResult) *draapi.ResourceSlice {
driverName := draapi.MakeUniqueString(deviceAllocation.Driver)
poolName := draapi.MakeUniqueString(deviceAllocation.Pool)
deviceName := draapi.MakeUniqueString(deviceAllocation.Device)
for _, pool := range alloc.pools {
if pool.Driver != deviceAllocation.Driver ||
pool.Pool != deviceAllocation.Pool {
if pool.Driver != driverName ||
pool.Pool != poolName {
continue
}
for _, slice := range pool.Slices {
for _, device := range slice.Spec.Devices {
if device.Name == deviceAllocation.Device {
if device.Name == deviceName {
return slice
}
}

View File

@@ -20,6 +20,7 @@ import (
"errors"
"flag"
"fmt"
"slices"
"testing"
"github.com/onsi/gomega"
@@ -182,17 +183,6 @@ func claimWithDeviceConfig(name, request, class, driver, attribute string) *reso
return claim
}
// generate allocated ResourceClaim object
func allocatedClaim(name, request, class string, results ...resourceapi.DeviceRequestAllocationResult) *resourceapi.ResourceClaim {
claim := claim(name, request, class)
claim.Status.Allocation = &resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{
Results: results,
},
}
return claim
}
// generate a Device object with the given name, capacity and attributes.
func device(name string, capacity map[resourceapi.QualifiedName]resource.Quantity, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) resourceapi.Device {
return resourceapi.Device{
@@ -362,7 +352,7 @@ func TestAllocator(t *testing.T) {
testcases := map[string]struct {
adminAccess bool
claimsToAllocate []*resourceapi.ResourceClaim
allocatedClaims []*resourceapi.ResourceClaim
allocatedDevices []DeviceID
classes []*resourceapi.DeviceClass
slices []*resourceapi.ResourceSlice
node *v1.Node
@@ -943,12 +933,10 @@ func TestAllocator(t *testing.T) {
},
"already-allocated-devices": {
claimsToAllocate: objects(claim(claim0, req0, classA)),
allocatedClaims: objects(
allocatedClaim(claim0, req0, classA,
deviceAllocationResult(req0, driverA, pool1, device1, false),
deviceAllocationResult(req1, driverA, pool1, device2, false),
),
),
allocatedDevices: []DeviceID{
MakeDeviceID(driverA, pool1, device1),
MakeDeviceID(driverA, pool1, device2),
},
classes: objects(class(classA, driverA)),
slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)),
node: node(node1, region1),
@@ -976,12 +964,10 @@ func TestAllocator(t *testing.T) {
c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true)
return []*resourceapi.ResourceClaim{c}
}(),
allocatedClaims: objects(
allocatedClaim(claim0, req0, classA,
deviceAllocationResult(req0, driverA, pool1, device1, false),
deviceAllocationResult(req1, driverA, pool1, device2, false),
),
),
allocatedDevices: []DeviceID{
MakeDeviceID(driverA, pool1, device1),
MakeDeviceID(driverA, pool1, device2),
},
classes: objects(class(classA, driverA)),
slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)),
node: node(node1, region1),
@@ -990,22 +976,6 @@ func TestAllocator(t *testing.T) {
allocationResult(localNodeSelector(node1), deviceAllocationResult(req0, driverA, pool1, device1, true)),
},
},
"already-allocated-for-admin-access": {
claimsToAllocate: objects(claim(claim0, req0, classA)),
allocatedClaims: objects(
allocatedClaim(claim0, req0, classA,
deviceAllocationResult(req0, driverA, pool1, device1, true),
deviceAllocationResult(req1, driverA, pool1, device2, true),
),
),
classes: objects(class(classA, driverA)),
slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)),
node: node(node1, region1),
expectResults: []any{
allocationResult(localNodeSelector(node1), deviceAllocationResult(req0, driverA, pool1, device1, false)),
},
},
"with-constraint": {
claimsToAllocate: objects(claimWithRequests(
claim0,
@@ -1397,23 +1367,15 @@ func TestAllocator(t *testing.T) {
// Listing objects is deterministic and returns them in the same
// order as in the test case. That makes the allocation result
// also deterministic.
var allocated, toAllocate claimLister
var classLister informerLister[resourceapi.DeviceClass]
var sliceLister informerLister[resourceapi.ResourceSlice]
for _, claim := range tc.claimsToAllocate {
toAllocate.claims = append(toAllocate.claims, claim.DeepCopy())
}
for _, claim := range tc.allocatedClaims {
allocated.claims = append(allocated.claims, claim.DeepCopy())
}
for _, slice := range tc.slices {
sliceLister.objs = append(sliceLister.objs, slice.DeepCopy())
}
for _, class := range tc.classes {
classLister.objs = append(classLister.objs, class.DeepCopy())
}
claimsToAllocate := slices.Clone(tc.claimsToAllocate)
allocatedDevices := slices.Clone(tc.allocatedDevices)
slices := slices.Clone(tc.slices)
allocator, err := NewAllocator(ctx, tc.adminAccess, toAllocate.claims, allocated, classLister, sliceLister)
allocator, err := NewAllocator(ctx, tc.adminAccess, claimsToAllocate, allocatedDevices, classLister, slices)
g.Expect(err).ToNot(gomega.HaveOccurred())
results, err := allocator.Allocate(ctx, tc.node)
@@ -1425,23 +1387,14 @@ func TestAllocator(t *testing.T) {
g.Expect(results).To(gomega.ConsistOf(tc.expectResults...))
// Objects that the allocator had access to should not have been modified.
g.Expect(toAllocate.claims).To(gomega.HaveExactElements(tc.claimsToAllocate))
g.Expect(allocated.claims).To(gomega.HaveExactElements(tc.allocatedClaims))
g.Expect(sliceLister.objs).To(gomega.ConsistOf(tc.slices))
g.Expect(claimsToAllocate).To(gomega.HaveExactElements(tc.claimsToAllocate))
g.Expect(allocatedDevices).To(gomega.HaveExactElements(tc.allocatedDevices))
g.Expect(slices).To(gomega.ConsistOf(tc.slices))
g.Expect(classLister.objs).To(gomega.ConsistOf(tc.classes))
})
}
}
type claimLister struct {
claims []*resourceapi.ResourceClaim
err error
}
func (l claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
return l.claims, l.err
}
type informerLister[T any] struct {
objs []*T
err error

View File

@@ -22,10 +22,9 @@ import (
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
resourcelisters "k8s.io/client-go/listers/resource/v1alpha3"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
draapi "k8s.io/dynamic-resource-allocation/api"
)
// GatherPools collects information about all resource pools which provide
@@ -34,31 +33,30 @@ import (
// Out-dated slices are silently ignored. Pools may be incomplete (not all
// required slices available) or invalid (for example, device names not unique).
// Both is recorded in the result.
func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceLister, node *v1.Node) ([]*Pool, error) {
func GatherPools(ctx context.Context, slices []*resourceapi.ResourceSlice, node *v1.Node) ([]*Pool, error) {
pools := make(map[PoolID]*Pool)
// TODO (future): use a custom lister interface and implement it with
// and indexer on the node name field. Then here we can ask for only
// slices with the right node name and those with no node name.
slices, err := sliceLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("list resource slices: %w", err)
}
for _, slice := range slices {
switch {
case slice.Spec.NodeName != "":
if slice.Spec.NodeName == node.Name {
addSlice(pools, slice)
if err := addSlice(pools, slice); err != nil {
return nil, fmt.Errorf("add node slice %s: %w", slice.Name, err)
}
}
case slice.Spec.AllNodes:
addSlice(pools, slice)
if err := addSlice(pools, slice); err != nil {
return nil, fmt.Errorf("add cluster slice %s: %w", slice.Name, err)
}
case slice.Spec.NodeSelector != nil:
selector, err := nodeaffinity.NewNodeSelector(slice.Spec.NodeSelector)
if err != nil {
return nil, fmt.Errorf("node selector in resource slice %s: %w", slice.Name, err)
}
if selector.Match(node) {
addSlice(pools, slice)
if err := addSlice(pools, slice); err != nil {
return nil, fmt.Errorf("add matching slice %s: %w", slice.Name, err)
}
}
default:
// Nothing known was set. This must be some future, unknown extension,
@@ -84,22 +82,27 @@ func GatherPools(ctx context.Context, sliceLister resourcelisters.ResourceSliceL
return result, nil
}
func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) {
func addSlice(pools map[PoolID]*Pool, s *resourceapi.ResourceSlice) error {
var slice draapi.ResourceSlice
if err := draapi.Convert_v1alpha3_ResourceSlice_To_api_ResourceSlice(s, &slice, nil); err != nil {
return fmt.Errorf("convert ResourceSlice: %w", err)
}
id := PoolID{Driver: slice.Spec.Driver, Pool: slice.Spec.Pool.Name}
pool := pools[id]
if pool == nil {
// New pool.
pool = &Pool{
PoolID: id,
Slices: []*resourceapi.ResourceSlice{slice},
Slices: []*draapi.ResourceSlice{&slice},
}
pools[id] = pool
return
return nil
}
if slice.Spec.Pool.Generation < pool.Slices[0].Spec.Pool.Generation {
// Out-dated.
return
return nil
}
if slice.Spec.Pool.Generation > pool.Slices[0].Spec.Pool.Generation {
@@ -108,11 +111,12 @@ func addSlice(pools map[PoolID]*Pool, slice *resourceapi.ResourceSlice) {
}
// Add to pool.
pool.Slices = append(pool.Slices, slice)
pool.Slices = append(pool.Slices, &slice)
return nil
}
func poolIsInvalid(pool *Pool) (bool, string) {
devices := sets.New[string]()
devices := sets.New[draapi.UniqueString]()
for _, slice := range pool.Slices {
for _, device := range slice.Spec.Devices {
if devices.Has(device.Name) {
@@ -129,21 +133,29 @@ type Pool struct {
IsIncomplete bool
IsInvalid bool
InvalidReason string
Slices []*resourceapi.ResourceSlice
Slices []*draapi.ResourceSlice
}
type PoolID struct {
Driver, Pool string
Driver, Pool draapi.UniqueString
}
func (p PoolID) String() string {
return p.Driver + "/" + p.Pool
return p.Driver.String() + "/" + p.Pool.String()
}
type DeviceID struct {
Driver, Pool, Device string
Driver, Pool, Device draapi.UniqueString
}
func (d DeviceID) String() string {
return d.Driver + "/" + d.Pool + "/" + d.Device
return d.Driver.String() + "/" + d.Pool.String() + "/" + d.Device.String()
}
func MakeDeviceID(driver, pool, device string) DeviceID {
return DeviceID{
Driver: draapi.MakeUniqueString(driver),
Pool: draapi.MakeUniqueString(pool),
Device: draapi.MakeUniqueString(device),
}
}

View File

@@ -277,7 +277,6 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
sliceLister := informerFactory.Resource().V1alpha3().ResourceSlices().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
claimCache := assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil)
claimLister := claimLister{cache: claimCache}
informerFactory.Start(tCtx.Done())
defer func() {
tCtx.Cancel("allocResourceClaimsOp.run is shutting down")
@@ -295,6 +294,8 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
// The set of nodes is assumed to be fixed at this point.
nodes, err := nodeLister.List(labels.Everything())
tCtx.ExpectNoError(err, "list nodes")
slices, err := sliceLister.List(labels.Everything())
tCtx.ExpectNoError(err, "list slices")
// Allocate one claim at a time, picking nodes randomly. Each
// allocation is stored immediately, using the claim cache to avoid
@@ -306,7 +307,19 @@ claims:
continue
}
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, claimLister, classLister, sliceLister)
objs := claimCache.List(nil)
allocatedDevices := make([]structured.DeviceID, 0, len(objs))
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if claim.Status.Allocation == nil {
continue
}
for _, result := range claim.Status.Allocation.Devices.Results {
allocatedDevices = append(allocatedDevices, structured.MakeDeviceID(result.Driver, result.Pool, result.Device))
}
}
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), []*resourceapi.ResourceClaim{claim}, allocatedDevices, classLister, slices)
tCtx.ExpectNoError(err, "create allocator")
rand.Shuffle(len(nodes), func(i, j int) {
@@ -327,19 +340,3 @@ claims:
tCtx.Fatalf("Could not allocate claim %d out of %d", i, len(claims.Items))
}
}
type claimLister struct {
cache *assumecache.AssumeCache
}
func (c claimLister) ListAllAllocated() ([]*resourceapi.ResourceClaim, error) {
objs := c.cache.List(nil)
allocatedClaims := make([]*resourceapi.ResourceClaim, 0, len(objs))
for _, obj := range objs {
claim := obj.(*resourceapi.ResourceClaim)
if claim.Status.Allocation != nil {
allocatedClaims = append(allocatedClaims, claim)
}
}
return allocatedClaims, nil
}