DRA scheduler: device taints and tolerations

Thanks to the tracker, the plugin sees all taints directly in the device
definition and can compare it against the tolerations of a request while
trying to find a device for the request.

When the feature is turnedd off, taints are ignored during scheduling.
This commit is contained in:
Jon Huhn 2025-02-26 17:07:23 -06:00 committed by Patrick Ohly
parent a027b439e5
commit 5760a4f282
18 changed files with 577 additions and 51 deletions

View File

@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
corev1helpers "k8s.io/component-helpers/scheduling/corev1" corev1helpers "k8s.io/component-helpers/scheduling/corev1"
corev1nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" corev1nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/backend/queue"
@ -366,6 +367,7 @@ func addAllEventHandlers(
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
resourceClaimCache *assumecache.AssumeCache, resourceClaimCache *assumecache.AssumeCache,
resourceSliceTracker *resourceslicetracker.Tracker,
gvkMap map[framework.EventResource]framework.ActionType, gvkMap map[framework.EventResource]framework.ActionType,
) error { ) error {
var ( var (
@ -555,7 +557,7 @@ func addAllEventHandlers(
} }
case framework.ResourceSlice: case framework.ResourceSlice:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1beta1().ResourceSlices().Informer().AddEventHandler( if handlerRegistration, err = resourceSliceTracker.AddEventHandler(
buildEvtResHandler(at, framework.ResourceSlice), buildEvtResHandler(at, framework.ResourceSlice),
); err != nil { ); err != nil {
return err return err

View File

@ -28,12 +28,14 @@ import (
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
resourcealphaapi "k8s.io/api/resource/v1alpha3"
resourceapi "k8s.io/api/resource/v1beta1" resourceapi "k8s.io/api/resource/v1beta1"
storagev1 "k8s.io/api/storage/v1" storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
@ -395,6 +397,7 @@ func TestAddAllEventHandlers(t *testing.T) {
name string name string
gvkMap map[framework.EventResource]framework.ActionType gvkMap map[framework.EventResource]framework.ActionType
enableDRA bool enableDRA bool
enableDRADeviceTaints bool
expectStaticInformers map[reflect.Type]bool expectStaticInformers map[reflect.Type]bool
expectDynamicInformers map[schema.GroupVersionResource]bool expectDynamicInformers map[schema.GroupVersionResource]bool
}{ }{
@ -423,7 +426,7 @@ func TestAddAllEventHandlers(t *testing.T) {
expectDynamicInformers: map[schema.GroupVersionResource]bool{}, expectDynamicInformers: map[schema.GroupVersionResource]bool{},
}, },
{ {
name: "all DRA events enabled", name: "core DRA events enabled",
gvkMap: map[framework.EventResource]framework.ActionType{ gvkMap: map[framework.EventResource]framework.ActionType{
framework.ResourceClaim: framework.Add, framework.ResourceClaim: framework.Add,
framework.ResourceSlice: framework.Add, framework.ResourceSlice: framework.Add,
@ -440,6 +443,26 @@ func TestAddAllEventHandlers(t *testing.T) {
}, },
expectDynamicInformers: map[schema.GroupVersionResource]bool{}, expectDynamicInformers: map[schema.GroupVersionResource]bool{},
}, },
{
name: "all DRA events enabled",
gvkMap: map[framework.EventResource]framework.ActionType{
framework.ResourceClaim: framework.Add,
framework.ResourceSlice: framework.Add,
framework.DeviceClass: framework.Add,
},
enableDRA: true,
enableDRADeviceTaints: true,
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
reflect.TypeOf(&resourceapi.ResourceClaim{}): true,
reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
reflect.TypeOf(&resourcealphaapi.DeviceTaintRule{}): true,
reflect.TypeOf(&resourceapi.DeviceClass{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{ {
name: "add GVKs handlers defined in framework dynamically", name: "add GVKs handlers defined in framework dynamically",
gvkMap: map[framework.EventResource]framework.ActionType{ gvkMap: map[framework.EventResource]framework.ActionType{
@ -499,6 +522,7 @@ func TestAddAllEventHandlers(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRADeviceTaints, tt.enableDRADeviceTaints)
logger, ctx := ktesting.NewTestContext(t) logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@ -515,12 +539,27 @@ func TestAddAllEventHandlers(t *testing.T) {
dynclient := dyfake.NewSimpleDynamicClient(scheme) dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)
var resourceClaimCache *assumecache.AssumeCache var resourceClaimCache *assumecache.AssumeCache
var resourceSliceTracker *resourceslicetracker.Tracker
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1beta1().ResourceClaims().Informer() resourceClaimInformer := informerFactory.Resource().V1beta1().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
var err error
opts := resourceslicetracker.Options{
EnableDeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
SliceInformer: informerFactory.Resource().V1beta1().ResourceSlices(),
}
if opts.EnableDeviceTaints {
opts.TaintInformer = informerFactory.Resource().V1alpha3().DeviceTaintRules()
opts.ClassInformer = informerFactory.Resource().V1beta1().DeviceClasses()
}
resourceSliceTracker, err = resourceslicetracker.StartTracker(ctx, opts)
if err != nil {
t.Fatalf("couldn't start resource slice tracker: %v", err)
}
} }
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil { if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, resourceSliceTracker, tt.gvkMap); err != nil {
t.Fatalf("Add event handlers failed, error = %v", err) t.Fatalf("Add event handlers failed, error = %v", err)
} }

View File

@ -72,7 +72,7 @@ func (c *shareListerContract) StorageInfos() framework.StorageInfoLister {
type resourceSliceListerContract struct{} type resourceSliceListerContract struct{}
func (c *resourceSliceListerContract) List() ([]*resourceapi.ResourceSlice, error) { func (c *resourceSliceListerContract) ListWithDeviceTaintRules() ([]*resourceapi.ResourceSlice, error) {
return nil, nil return nil, nil
} }

View File

@ -50,8 +50,13 @@ type SharedLister interface {
// ResourceSliceLister can be used to obtain ResourceSlices. // ResourceSliceLister can be used to obtain ResourceSlices.
type ResourceSliceLister interface { type ResourceSliceLister interface {
// List returns a list of all ResourceSlices. // ListWithDeviceTaintRules returns a list of all ResourceSlices with DeviceTaintRules applied
List() ([]*resourceapi.ResourceSlice, error) // if the DRADeviceTaints feature is enabled, otherwise without them.
//
// k8s.io/dynamic-resource-allocation/resourceslice/tracker provides an implementation
// of the necessary logic. That tracker can be instantiated as a replacement for
// a normal ResourceSlice informer and provides a ListPatchedResourceSlices method.
ListWithDeviceTaintRules() ([]*resourceapi.ResourceSlice, error)
} }
// DeviceClassLister can be used to obtain DeviceClasses. // DeviceClassLister can be used to obtain DeviceClasses.

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
resourcelisters "k8s.io/client-go/listers/resource/v1beta1" resourcelisters "k8s.io/client-go/listers/resource/v1beta1"
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/dynamic-resource-allocation/structured" "k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
@ -44,8 +45,9 @@ type DefaultDRAManager struct {
deviceClassLister *deviceClassLister deviceClassLister *deviceClassLister
} }
func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, informerFactory informers.SharedInformerFactory) *DefaultDRAManager { func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, resourceSliceTracker *resourceslicetracker.Tracker, informerFactory informers.SharedInformerFactory) *DefaultDRAManager {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
manager := &DefaultDRAManager{ manager := &DefaultDRAManager{
resourceClaimTracker: &claimTracker{ resourceClaimTracker: &claimTracker{
cache: claimsCache, cache: claimsCache,
@ -53,7 +55,7 @@ func NewDRAManager(ctx context.Context, claimsCache *assumecache.AssumeCache, in
allocatedDevices: newAllocatedDevices(logger), allocatedDevices: newAllocatedDevices(logger),
logger: logger, logger: logger,
}, },
resourceSliceLister: &resourceSliceLister{sliceLister: informerFactory.Resource().V1beta1().ResourceSlices().Lister()}, resourceSliceLister: &resourceSliceLister{tracker: resourceSliceTracker},
deviceClassLister: &deviceClassLister{classLister: informerFactory.Resource().V1beta1().DeviceClasses().Lister()}, deviceClassLister: &deviceClassLister{classLister: informerFactory.Resource().V1beta1().DeviceClasses().Lister()},
} }
@ -79,11 +81,11 @@ func (s *DefaultDRAManager) DeviceClasses() framework.DeviceClassLister {
var _ framework.ResourceSliceLister = &resourceSliceLister{} var _ framework.ResourceSliceLister = &resourceSliceLister{}
type resourceSliceLister struct { type resourceSliceLister struct {
sliceLister resourcelisters.ResourceSliceLister tracker *resourceslicetracker.Tracker
} }
func (l *resourceSliceLister) List() ([]*resourceapi.ResourceSlice, error) { func (l *resourceSliceLister) ListWithDeviceTaintRules() ([]*resourceapi.ResourceSlice, error) {
return l.sliceLister.List(labels.Everything()) return l.tracker.ListPatchedResourceSlices()
} }
var _ framework.DeviceClassLister = &deviceClassLister{} var _ framework.DeviceClassLister = &deviceClassLister{}

View File

@ -106,6 +106,7 @@ type DynamicResources struct {
enableAdminAccess bool enableAdminAccess bool
enablePrioritizedList bool enablePrioritizedList bool
enableSchedulingQueueHint bool enableSchedulingQueueHint bool
enableDeviceTaints bool
fh framework.Handle fh framework.Handle
clientset kubernetes.Interface clientset kubernetes.Interface
@ -123,6 +124,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
pl := &DynamicResources{ pl := &DynamicResources{
enabled: true, enabled: true,
enableAdminAccess: fts.EnableDRAAdminAccess, enableAdminAccess: fts.EnableDRAAdminAccess,
enableDeviceTaints: fts.EnableDRADeviceTaints,
enablePrioritizedList: fts.EnableDRAPrioritizedList, enablePrioritizedList: fts.EnableDRAPrioritizedList,
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint, enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
@ -448,11 +450,11 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
if err != nil { if err != nil {
return nil, statusError(logger, err) return nil, statusError(logger, err)
} }
slices, err := pl.draManager.ResourceSlices().List() slices, err := pl.draManager.ResourceSlices().ListWithDeviceTaintRules()
if err != nil { if err != nil {
return nil, statusError(logger, err) return nil, statusError(logger, err)
} }
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, pl.enablePrioritizedList, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache) allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, pl.enablePrioritizedList, pl.enableDeviceTaints, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
if err != nil { if err != nil {
return nil, statusError(logger, err) return nil, statusError(logger, err)
} }

View File

@ -38,6 +38,7 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
cgotesting "k8s.io/client-go/testing" cgotesting "k8s.io/client-go/testing"
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
@ -183,8 +184,22 @@ var (
otherAllocatedClaim = st.FromResourceClaim(otherClaim). otherAllocatedClaim = st.FromResourceClaim(otherClaim).
Allocation(allocationResult). Allocation(allocationResult).
Obj() Obj()
deviceTaint = resourceapi.DeviceTaint{
Key: "taint-key",
Value: "taint-value",
Effect: resourceapi.DeviceTaintEffectNoSchedule,
}
) )
func taintDevices(slice *resourceapi.ResourceSlice) *resourceapi.ResourceSlice {
slice = slice.DeepCopy()
for i := range slice.Spec.Devices {
slice.Spec.Devices[i].Basic.Taints = append(slice.Spec.Devices[i].Basic.Taints, deviceTaint)
}
return slice
}
func reserve(claim *resourceapi.ResourceClaim, pod *v1.Pod) *resourceapi.ResourceClaim { func reserve(claim *resourceapi.ResourceClaim, pod *v1.Pod) *resourceapi.ResourceClaim {
return st.FromResourceClaim(claim). return st.FromResourceClaim(claim).
ReservedForPod(pod.Name, types.UID(pod.UID)). ReservedForPod(pod.Name, types.UID(pod.UID)).
@ -343,6 +358,7 @@ func TestPlugin(t *testing.T) {
disableDRA bool disableDRA bool
enableDRAPrioritizedList bool enableDRAPrioritizedList bool
enableDRADeviceTaints bool
}{ }{
"empty": { "empty": {
pod: st.MakePod().Name("foo").Namespace("default").Obj(), pod: st.MakePod().Name("foo").Namespace("default").Obj(),
@ -604,6 +620,56 @@ func TestPlugin(t *testing.T) {
}, },
}, },
// The two test cases for device tainting only need to cover
// whether the feature gate is passed through to the allocator
// correctly. The actual logic around device taints and allocation
// is in the allocator.
"tainted-device-disabled": {
enableDRADeviceTaints: false,
pod: podWithClaimName,
claims: []*resourceapi.ResourceClaim{pendingClaim},
classes: []*resourceapi.DeviceClass{deviceClass},
objs: []apiruntime.Object{taintDevices(workerNodeSlice)},
want: want{
reserve: result{
inFlightClaim: allocatedClaim,
},
prebind: result{
assumedClaim: reserve(allocatedClaim, podWithClaimName),
changes: change{
claim: func(claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
if claim.Name == claimName {
claim = claim.DeepCopy()
claim.Finalizers = allocatedClaim.Finalizers
claim.Status = inUseClaim.Status
}
return claim
},
},
},
postbind: result{
assumedClaim: reserve(allocatedClaim, podWithClaimName),
},
},
},
"tainted-device-enabled": {
enableDRADeviceTaints: true,
pod: podWithClaimName,
claims: []*resourceapi.ResourceClaim{pendingClaim},
classes: []*resourceapi.DeviceClass{deviceClass},
objs: []apiruntime.Object{taintDevices(workerNodeSlice)},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `cannot allocate all claims`),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"request-admin-access-with-DRAAdminAccess-featuregate": { "request-admin-access-with-DRAAdminAccess-featuregate": {
// When the DRAAdminAccess feature gate is enabled, // When the DRAAdminAccess feature gate is enabled,
// Because the pending claim asks for admin access, // Because the pending claim asks for admin access,
@ -920,6 +986,7 @@ func TestPlugin(t *testing.T) {
} }
features := feature.Features{ features := feature.Features{
EnableDRAAdminAccess: tc.enableDRAAdminAccess, EnableDRAAdminAccess: tc.enableDRAAdminAccess,
EnableDRADeviceTaints: tc.enableDRADeviceTaints,
EnableDynamicResourceAllocation: !tc.disableDRA, EnableDynamicResourceAllocation: !tc.disableDRA,
EnableDRAPrioritizedList: tc.enableDRAPrioritizedList, EnableDRAPrioritizedList: tc.enableDRAPrioritizedList,
} }
@ -1189,7 +1256,16 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourceapi.ResourceClaim,
tc.client.PrependReactor("*", "*", reactor) tc.client.PrependReactor("*", "*", reactor)
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0) tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
tc.draManager = NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1beta1().ResourceClaims().Informer(), "resource claim", "", nil), tc.informerFactory) resourceSliceTrackerOpts := resourceslicetracker.Options{
EnableDeviceTaints: true,
SliceInformer: tc.informerFactory.Resource().V1beta1().ResourceSlices(),
TaintInformer: tc.informerFactory.Resource().V1alpha3().DeviceTaintRules(),
ClassInformer: tc.informerFactory.Resource().V1beta1().DeviceClasses(),
KubeClient: tc.client,
}
resourceSliceTracker, err := resourceslicetracker.StartTracker(tCtx, resourceSliceTrackerOpts)
require.NoError(t, err, "couldn't start resource slice tracker")
tc.draManager = NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1beta1().ResourceClaims().Informer(), "resource claim", "", nil), resourceSliceTracker, tc.informerFactory)
opts := []runtime.Option{ opts := []runtime.Option{
runtime.WithClientSet(tc.client), runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory), runtime.WithInformerFactory(tc.informerFactory),

View File

@ -22,6 +22,7 @@ package feature
type Features struct { type Features struct {
EnableDRAPrioritizedList bool EnableDRAPrioritizedList bool
EnableDRAAdminAccess bool EnableDRAAdminAccess bool
EnableDRADeviceTaints bool
EnableDynamicResourceAllocation bool EnableDynamicResourceAllocation bool
EnableVolumeCapacityPriority bool EnableVolumeCapacityPriority bool
EnableVolumeAttributesClass bool EnableVolumeAttributesClass bool

View File

@ -48,6 +48,7 @@ func NewInTreeRegistry() runtime.Registry {
fts := plfeature.Features{ fts := plfeature.Features{
EnableDRAPrioritizedList: feature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList), EnableDRAPrioritizedList: feature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList),
EnableDRAAdminAccess: feature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), EnableDRAAdminAccess: feature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
EnableDRADeviceTaints: feature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation), EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority), EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableVolumeAttributesClass: feature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass), EnableVolumeAttributesClass: feature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass),

View File

@ -33,6 +33,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/klog/v2" "k8s.io/klog/v2"
configv1 "k8s.io/kube-scheduler/config/v1" configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
@ -307,11 +308,27 @@ func New(ctx context.Context,
waitingPods := frameworkruntime.NewWaitingPodsMap() waitingPods := frameworkruntime.NewWaitingPodsMap()
var resourceClaimCache *assumecache.AssumeCache var resourceClaimCache *assumecache.AssumeCache
var resourceSliceTracker *resourceslicetracker.Tracker
var draManager framework.SharedDRAManager var draManager framework.SharedDRAManager
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1beta1().ResourceClaims().Informer() resourceClaimInformer := informerFactory.Resource().V1beta1().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
draManager = dynamicresources.NewDRAManager(ctx, resourceClaimCache, informerFactory) resourceSliceTrackerOpts := resourceslicetracker.Options{
EnableDeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
SliceInformer: informerFactory.Resource().V1beta1().ResourceSlices(),
KubeClient: client,
}
// If device taints are disabled, the additional informers are not needed and
// the tracker turns into a simple wrapper around the slice informer.
if resourceSliceTrackerOpts.EnableDeviceTaints {
resourceSliceTrackerOpts.TaintInformer = informerFactory.Resource().V1alpha3().DeviceTaintRules()
resourceSliceTrackerOpts.ClassInformer = informerFactory.Resource().V1beta1().DeviceClasses()
}
resourceSliceTracker, err = resourceslicetracker.StartTracker(ctx, resourceSliceTrackerOpts)
if err != nil {
return nil, fmt.Errorf("couldn't start resource slice tracker: %w", err)
}
draManager = dynamicresources.NewDRAManager(ctx, resourceClaimCache, resourceSliceTracker, informerFactory)
} }
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
@ -389,7 +406,7 @@ func New(ctx context.Context,
sched.NextPod = podQueue.Pop sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers() sched.applyDefaultHandlers()
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil { if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, resourceSliceTracker, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err) return nil, fmt.Errorf("adding event handlers: %w", err)
} }

View File

@ -628,6 +628,9 @@ func ClusterRoles() []rbacv1.ClusterRole {
rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(), rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceslices").RuleOrDie(), rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceslices").RuleOrDie(),
) )
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
kubeSchedulerRules = append(kubeSchedulerRules, rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("devicetaintrules").RuleOrDie())
}
} }
roles = append(roles, rbacv1.ClusterRole{ roles = append(roles, rbacv1.ClusterRole{
// a role to use for the kube-scheduler // a role to use for the kube-scheduler

View File

@ -968,6 +968,14 @@ items:
- get - get
- list - list
- watch - watch
- apiGroups:
- resource.k8s.io
resources:
- devicetaintrules
verbs:
- get
- list
- watch
- apiVersion: rbac.authorization.k8s.io/v1 - apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole kind: ClusterRole
metadata: metadata:

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
draapi "k8s.io/dynamic-resource-allocation/api" draapi "k8s.io/dynamic-resource-allocation/api"
"k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/cel"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
) )
@ -49,6 +50,7 @@ type deviceClassLister interface {
type Allocator struct { type Allocator struct {
adminAccessEnabled bool adminAccessEnabled bool
prioritizedListEnabled bool prioritizedListEnabled bool
deviceTaintsEnabled bool
claimsToAllocate []*resourceapi.ResourceClaim claimsToAllocate []*resourceapi.ResourceClaim
allocatedDevices sets.Set[DeviceID] allocatedDevices sets.Set[DeviceID]
classLister deviceClassLister classLister deviceClassLister
@ -63,6 +65,7 @@ type Allocator struct {
func NewAllocator(ctx context.Context, func NewAllocator(ctx context.Context,
adminAccessEnabled bool, adminAccessEnabled bool,
prioritizedListEnabled bool, prioritizedListEnabled bool,
deviceTaintsEnabled bool,
claimsToAllocate []*resourceapi.ResourceClaim, claimsToAllocate []*resourceapi.ResourceClaim,
allocatedDevices sets.Set[DeviceID], allocatedDevices sets.Set[DeviceID],
classLister deviceClassLister, classLister deviceClassLister,
@ -72,6 +75,7 @@ func NewAllocator(ctx context.Context,
return &Allocator{ return &Allocator{
adminAccessEnabled: adminAccessEnabled, adminAccessEnabled: adminAccessEnabled,
prioritizedListEnabled: prioritizedListEnabled, prioritizedListEnabled: prioritizedListEnabled,
deviceTaintsEnabled: deviceTaintsEnabled,
claimsToAllocate: claimsToAllocate, claimsToAllocate: claimsToAllocate,
allocatedDevices: allocatedDevices, allocatedDevices: allocatedDevices,
classLister: classLister, classLister: classLister,
@ -956,6 +960,12 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
subRequestName = requestData.request.name() subRequestName = requestData.request.name()
} }
// Might be tainted, in which case the taint has to be tolerated.
// The check is skipped if the feature is disabled.
if alloc.deviceTaintsEnabled && !allTaintsTolerated(device.basic, request) {
return false, nil, nil
}
// It's available. Now check constraints. // It's available. Now check constraints.
for i, constraint := range alloc.constraints[r.claimIndex] { for i, constraint := range alloc.constraints[r.claimIndex] {
added := constraint.add(baseRequestName, subRequestName, device.basic, device.id) added := constraint.add(baseRequestName, subRequestName, device.basic, device.id)
@ -1005,6 +1015,24 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
}, nil }, nil
} }
func allTaintsTolerated(device *draapi.BasicDevice, request requestAccessor) bool {
for _, taint := range device.Taints {
if !taintTolerated(taint, request) {
return false
}
}
return true
}
func taintTolerated(taint resourceapi.DeviceTaint, request requestAccessor) bool {
for _, toleration := range request.tolerations() {
if resourceclaim.ToleratesTaint(toleration, taint) {
return true
}
}
return false
}
// createNodeSelector constructs a node selector for the allocation, if needed, // createNodeSelector constructs a node selector for the allocation, if needed,
// otherwise it returns nil. // otherwise it returns nil.
func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.NodeSelector, error) { func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.NodeSelector, error) {
@ -1065,6 +1093,7 @@ type requestAccessor interface {
adminAccess() bool adminAccess() bool
hasAdminAccess() bool hasAdminAccess() bool
selectors() []resourceapi.DeviceSelector selectors() []resourceapi.DeviceSelector
tolerations() []resourceapi.DeviceToleration
} }
// deviceRequestAccessor is an implementation of the // deviceRequestAccessor is an implementation of the
@ -1101,6 +1130,10 @@ func (d *deviceRequestAccessor) selectors() []resourceapi.DeviceSelector {
return d.request.Selectors return d.request.Selectors
} }
func (d *deviceRequestAccessor) tolerations() []resourceapi.DeviceToleration {
return d.request.Tolerations
}
// deviceSubRequestAccessor is an implementation of the // deviceSubRequestAccessor is an implementation of the
// requestAccessor interface for DeviceSubRequests. // requestAccessor interface for DeviceSubRequests.
type deviceSubRequestAccessor struct { type deviceSubRequestAccessor struct {
@ -1135,6 +1168,10 @@ func (d *deviceSubRequestAccessor) selectors() []resourceapi.DeviceSelector {
return d.subRequest.Selectors return d.subRequest.Selectors
} }
func (d *deviceSubRequestAccessor) tolerations() []resourceapi.DeviceToleration {
return d.subRequest.Tolerations
}
func addNewNodeSelectorRequirements(from []v1.NodeSelectorRequirement, to *[]v1.NodeSelectorRequirement) { func addNewNodeSelectorRequirements(from []v1.NodeSelectorRequirement, to *[]v1.NodeSelectorRequirement) {
for _, requirement := range from { for _, requirement := range from {
if !containsNodeSelectorRequirement(*to, requirement) { if !containsNodeSelectorRequirement(*to, requirement) {

View File

@ -147,8 +147,8 @@ func classWithConfig(name, driver, attribute string) *resourceapi.DeviceClass {
} }
// generate a ResourceClaim object with the given name and device requests. // generate a ResourceClaim object with the given name and device requests.
func claimWithRequests(name string, constraints []resourceapi.DeviceConstraint, requests ...resourceapi.DeviceRequest) *resourceapi.ResourceClaim { func claimWithRequests(name string, constraints []resourceapi.DeviceConstraint, requests ...resourceapi.DeviceRequest) wrapResourceClaim {
return &resourceapi.ResourceClaim{ return wrapResourceClaim{&resourceapi.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
}, },
@ -158,7 +158,7 @@ func claimWithRequests(name string, constraints []resourceapi.DeviceConstraint,
Constraints: constraints, Constraints: constraints,
}, },
}, },
} }}
} }
// generate a DeviceRequest object with the given name, class and selectors. // generate a DeviceRequest object with the given name, class and selectors.
@ -191,14 +191,28 @@ func requestWithPrioritizedList(name string, prioritizedRequests ...resourceapi.
} }
// generate a ResourceClaim object with the given name, request and class. // generate a ResourceClaim object with the given name, request and class.
func claim(name, req, class string, constraints ...resourceapi.DeviceConstraint) *resourceapi.ResourceClaim { func claim(name, req, class string, constraints ...resourceapi.DeviceConstraint) wrapResourceClaim {
claim := claimWithRequests(name, constraints, request(req, class, 1)) claim := claimWithRequests(name, constraints, request(req, class, 1))
return claim return claim
} }
type wrapResourceClaim struct{ *resourceapi.ResourceClaim }
func (in wrapResourceClaim) obj() *resourceapi.ResourceClaim {
return in.ResourceClaim
}
func (in wrapResourceClaim) withTolerations(tolerations ...resourceapi.DeviceToleration) wrapResourceClaim {
out := in.DeepCopy()
for i := range out.Spec.Devices.Requests {
out.Spec.Devices.Requests[i].Tolerations = append(out.Spec.Devices.Requests[i].Tolerations, tolerations...)
}
return wrapResourceClaim{out}
}
// generate a ResourceClaim object with the given name, request, class, and attribute. // generate a ResourceClaim object with the given name, request, class, and attribute.
// attribute is used to generate parameters in a form of JSON {attribute: attributeValue}. // attribute is used to generate parameters in a form of JSON {attribute: attributeValue}.
func claimWithDeviceConfig(name, request, class, driver, attribute string) *resourceapi.ResourceClaim { func claimWithDeviceConfig(name, request, class, driver, attribute string) wrapResourceClaim {
claim := claim(name, request, class) claim := claim(name, request, class)
claim.Spec.Devices.Config = []resourceapi.DeviceClaimConfiguration{ claim.Spec.Devices.Config = []resourceapi.DeviceClaimConfiguration{
{ {
@ -208,7 +222,7 @@ func claimWithDeviceConfig(name, request, class, driver, attribute string) *reso
return claim return claim
} }
func claimWithAll(name string, requests []resourceapi.DeviceRequest, constraints []resourceapi.DeviceConstraint, configs []resourceapi.DeviceClaimConfiguration) *resourceapi.ResourceClaim { func claimWithAll(name string, requests []resourceapi.DeviceRequest, constraints []resourceapi.DeviceConstraint, configs []resourceapi.DeviceClaimConfiguration) wrapResourceClaim {
claim := claimWithRequests(name, constraints, requests...) claim := claimWithRequests(name, constraints, requests...)
claim.Spec.Devices.Config = configs claim.Spec.Devices.Config = configs
return claim return claim
@ -222,7 +236,7 @@ func deviceClaimConfig(requests []string, deviceConfig resourceapi.DeviceConfigu
} }
// generate a Device object with the given name, capacity and attributes. // generate a Device object with the given name, capacity and attributes.
func device(name string, capacity map[resourceapi.QualifiedName]resource.Quantity, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) resourceapi.Device { func device(name string, capacity map[resourceapi.QualifiedName]resource.Quantity, attributes map[resourceapi.QualifiedName]resourceapi.DeviceAttribute) wrapDevice {
device := resourceapi.Device{ device := resourceapi.Device{
Name: name, Name: name,
Basic: &resourceapi.BasicDevice{ Basic: &resourceapi.BasicDevice{
@ -233,14 +247,27 @@ func device(name string, capacity map[resourceapi.QualifiedName]resource.Quantit
for name, quantity := range capacity { for name, quantity := range capacity {
device.Basic.Capacity[name] = resourceapi.DeviceCapacity{Value: quantity} device.Basic.Capacity[name] = resourceapi.DeviceCapacity{Value: quantity}
} }
return device return wrapDevice(device)
}
type wrapDevice resourceapi.Device
func (in wrapDevice) obj() resourceapi.Device {
return resourceapi.Device(in)
}
func (in wrapDevice) withTaints(taints ...resourceapi.DeviceTaint) wrapDevice {
inDevice := resourceapi.Device(in)
device := inDevice.DeepCopy()
device.Basic.Taints = append(device.Basic.Taints, taints...)
return wrapDevice(*device)
} }
// generate a ResourceSlice object with the given name, node, // generate a ResourceSlice object with the given name, node,
// driver and pool names, generation and a list of devices. // driver and pool names, generation and a list of devices.
// The nodeSelection parameter may be a string (= node name), // The nodeSelection parameter may be a string (= node name),
// true (= all nodes), or a node selector (= specific nodes). // true (= all nodes), or a node selector (= specific nodes).
func slice(name string, nodeSelection any, pool, driver string, devices ...resourceapi.Device) *resourceapi.ResourceSlice { func slice(name string, nodeSelection any, pool, driver string, devices ...wrapDevice) *resourceapi.ResourceSlice {
slice := &resourceapi.ResourceSlice{ slice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
@ -252,10 +279,11 @@ func slice(name string, nodeSelection any, pool, driver string, devices ...resou
ResourceSliceCount: 1, ResourceSliceCount: 1,
Generation: 1, Generation: 1,
}, },
Devices: devices,
}, },
} }
for _, device := range devices {
slice.Spec.Devices = append(slice.Spec.Devices, resourceapi.Device(device))
}
switch nodeSelection := nodeSelection.(type) { switch nodeSelection := nodeSelection.(type) {
case *v1.NodeSelector: case *v1.NodeSelector:
slice.Spec.NodeSelector = nodeSelection slice.Spec.NodeSelector = nodeSelection
@ -389,6 +417,19 @@ func objects[T any](objs ...T) []T {
return objs return objs
} }
// convert a list of wrapper objects to a slice
func unwrap[T any, O wrapper[T]](objs ...O) []T {
out := make([]T, len(objs))
for i, obj := range objs {
out[i] = obj.obj()
}
return out
}
type wrapper[T any] interface {
obj() T
}
// generate a ResourceSlice object with the given parameters and no devices // generate a ResourceSlice object with the given parameters and no devices
func sliceWithNoDevices(name string, nodeSelection any, pool, driver string) *resourceapi.ResourceSlice { func sliceWithNoDevices(name string, nodeSelection any, pool, driver string) *resourceapi.ResourceSlice {
return slice(name, nodeSelection, pool, driver) return slice(name, nodeSelection, pool, driver)
@ -401,7 +442,7 @@ func sliceWithOneDevice(name string, nodeSelection any, pool, driver string) *re
// generate a ResourceSclie object with the given parameters and the specified number of devices. // generate a ResourceSclie object with the given parameters and the specified number of devices.
func sliceWithMultipleDevices(name string, nodeSelection any, pool, driver string, count int) *resourceapi.ResourceSlice { func sliceWithMultipleDevices(name string, nodeSelection any, pool, driver string, count int) *resourceapi.ResourceSlice {
var devices []resourceapi.Device var devices []wrapDevice
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
devices = append(devices, device(fmt.Sprintf("device-%d", i), nil, nil)) devices = append(devices, device(fmt.Sprintf("device-%d", i), nil, nil))
} }
@ -414,11 +455,36 @@ func TestAllocator(t *testing.T) {
stringAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "stringAttribute") stringAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "stringAttribute")
versionAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "driverVersion") versionAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "driverVersion")
intAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "numa") intAttribute := resourceapi.FullyQualifiedName(driverA + "/" + "numa")
taintKey := "taint-key"
taintValue := "taint-value"
taintValue2 := "taint-value-2"
taintNoSchedule := resourceapi.DeviceTaint{
Key: taintKey,
Value: taintValue,
Effect: resourceapi.DeviceTaintEffectNoSchedule,
}
taintNoExecute := resourceapi.DeviceTaint{
Key: taintKey,
Value: taintValue2,
Effect: resourceapi.DeviceTaintEffectNoExecute,
}
tolerationNoSchedule := resourceapi.DeviceToleration{
Operator: resourceapi.DeviceTolerationOpExists,
Key: taintKey,
Effect: resourceapi.DeviceTaintEffectNoSchedule,
}
tolerationNoExecute := resourceapi.DeviceToleration{
Operator: resourceapi.DeviceTolerationOpEqual,
Key: taintKey,
Value: taintValue2,
Effect: resourceapi.DeviceTaintEffectNoExecute,
}
testcases := map[string]struct { testcases := map[string]struct {
adminAccess bool adminAccess bool
prioritizedList bool prioritizedList bool
claimsToAllocate []*resourceapi.ResourceClaim deviceTaints bool
claimsToAllocate []wrapResourceClaim
allocatedDevices []DeviceID allocatedDevices []DeviceID
classes []*resourceapi.DeviceClass classes []*resourceapi.DeviceClass
slices []*resourceapi.ResourceSlice slices []*resourceapi.ResourceSlice
@ -955,11 +1021,11 @@ func TestAllocator(t *testing.T) {
}, },
"all-devices-some-allocated-admin-access": { "all-devices-some-allocated-admin-access": {
adminAccess: true, adminAccess: true,
claimsToAllocate: func() []*resourceapi.ResourceClaim { claimsToAllocate: func() []wrapResourceClaim {
c := claim(claim0, req0, classA) c := claim(claim0, req0, classA)
c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true) c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true)
c.Spec.Devices.Requests[0].AllocationMode = resourceapi.DeviceAllocationModeAll c.Spec.Devices.Requests[0].AllocationMode = resourceapi.DeviceAllocationModeAll
return []*resourceapi.ResourceClaim{c} return []wrapResourceClaim{c}
}(), }(),
allocatedDevices: []DeviceID{ allocatedDevices: []DeviceID{
MakeDeviceID(driverA, pool1, device1), MakeDeviceID(driverA, pool1, device1),
@ -978,7 +1044,7 @@ func TestAllocator(t *testing.T) {
"all-devices-slice-without-devices-prioritized-list": { "all-devices-slice-without-devices-prioritized-list": {
prioritizedList: true, prioritizedList: true,
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil, claim := claimWithRequests(claim0, nil,
requestWithPrioritizedList(req0, requestWithPrioritizedList(req0,
subRequest(subReq0, classA, 1), subRequest(subReq0, classA, 1),
@ -1004,7 +1070,7 @@ func TestAllocator(t *testing.T) {
"all-devices-no-slices-prioritized-list": { "all-devices-no-slices-prioritized-list": {
prioritizedList: true, prioritizedList: true,
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil, claim := claimWithRequests(claim0, nil,
requestWithPrioritizedList(req0, requestWithPrioritizedList(req0,
subRequest(subReq0, classA, 1), subRequest(subReq0, classA, 1),
@ -1029,7 +1095,7 @@ func TestAllocator(t *testing.T) {
"all-devices-some-allocated-prioritized-list": { "all-devices-some-allocated-prioritized-list": {
prioritizedList: true, prioritizedList: true,
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil, claim := claimWithRequests(claim0, nil,
requestWithPrioritizedList(req0, requestWithPrioritizedList(req0,
subRequest(subReq0, classA, 1), subRequest(subReq0, classA, 1),
@ -1152,10 +1218,10 @@ func TestAllocator(t *testing.T) {
}, },
"admin-access-disabled": { "admin-access-disabled": {
adminAccess: false, adminAccess: false,
claimsToAllocate: func() []*resourceapi.ResourceClaim { claimsToAllocate: func() []wrapResourceClaim {
c := claim(claim0, req0, classA) c := claim(claim0, req0, classA)
c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true) c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true)
return []*resourceapi.ResourceClaim{c} return []wrapResourceClaim{c}
}(), }(),
classes: objects(class(classA, driverA)), classes: objects(class(classA, driverA)),
slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)), slices: objects(sliceWithOneDevice(slice1, node1, pool1, driverA)),
@ -1166,10 +1232,10 @@ func TestAllocator(t *testing.T) {
}, },
"admin-access-enabled": { "admin-access-enabled": {
adminAccess: true, adminAccess: true,
claimsToAllocate: func() []*resourceapi.ResourceClaim { claimsToAllocate: func() []wrapResourceClaim {
c := claim(claim0, req0, classA) c := claim(claim0, req0, classA)
c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true) c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true)
return []*resourceapi.ResourceClaim{c} return []wrapResourceClaim{c}
}(), }(),
allocatedDevices: []DeviceID{ allocatedDevices: []DeviceID{
MakeDeviceID(driverA, pool1, device1), MakeDeviceID(driverA, pool1, device1),
@ -1250,7 +1316,7 @@ func TestAllocator(t *testing.T) {
}, },
"with-constraint-not-matching-int-attribute-all-devices": { "with-constraint-not-matching-int-attribute-all-devices": {
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claimWithRequests( claim := claimWithRequests(
claim0, claim0,
[]resourceapi.DeviceConstraint{{MatchAttribute: &intAttribute}}, []resourceapi.DeviceConstraint{{MatchAttribute: &intAttribute}},
@ -1434,7 +1500,7 @@ func TestAllocator(t *testing.T) {
}, },
"unknown-selector": { "unknown-selector": {
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claim(claim0, req0, classA) claim := claim(claim0, req0, classA)
claim.Spec.Devices.Requests[0].Selectors = []resourceapi.DeviceSelector{ claim.Spec.Devices.Requests[0].Selectors = []resourceapi.DeviceSelector{
{ /* empty = unknown future selector */ }, { /* empty = unknown future selector */ },
@ -1450,7 +1516,7 @@ func TestAllocator(t *testing.T) {
}, },
"unknown-allocation-mode": { "unknown-allocation-mode": {
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claim(claim0, req0, classA) claim := claim(claim0, req0, classA)
claim.Spec.Devices.Requests[0].AllocationMode = resourceapi.DeviceAllocationMode("future-mode") claim.Spec.Devices.Requests[0].AllocationMode = resourceapi.DeviceAllocationMode("future-mode")
return claim return claim
@ -1464,7 +1530,7 @@ func TestAllocator(t *testing.T) {
}, },
"unknown-constraint": { "unknown-constraint": {
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claim(claim0, req0, classA) claim := claim(claim0, req0, classA)
claim.Spec.Devices.Constraints = []resourceapi.DeviceConstraint{ claim.Spec.Devices.Constraints = []resourceapi.DeviceConstraint{
{ /* empty = unknown */ }, { /* empty = unknown */ },
@ -1492,7 +1558,7 @@ func TestAllocator(t *testing.T) {
}, },
"invalid-CEL-one-device": { "invalid-CEL-one-device": {
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claim(claim0, req0, classA) claim := claim(claim0, req0, classA)
claim.Spec.Devices.Requests[0].Selectors = []resourceapi.DeviceSelector{ claim.Spec.Devices.Requests[0].Selectors = []resourceapi.DeviceSelector{
{CEL: &resourceapi.CELDeviceSelector{Expression: "noSuchVar"}}, {CEL: &resourceapi.CELDeviceSelector{Expression: "noSuchVar"}},
@ -1522,7 +1588,7 @@ func TestAllocator(t *testing.T) {
}, },
"invalid-CEL-all-devices": { "invalid-CEL-all-devices": {
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claim(claim0, req0, classA) claim := claim(claim0, req0, classA)
claim.Spec.Devices.Requests[0].Selectors = []resourceapi.DeviceSelector{ claim.Spec.Devices.Requests[0].Selectors = []resourceapi.DeviceSelector{
{CEL: &resourceapi.CELDeviceSelector{Expression: "noSuchVar"}}, {CEL: &resourceapi.CELDeviceSelector{Expression: "noSuchVar"}},
@ -1846,7 +1912,7 @@ func TestAllocator(t *testing.T) {
"prioritized-list-subrequests-with-allocation-mode-all": { "prioritized-list-subrequests-with-allocation-mode-all": {
prioritizedList: true, prioritizedList: true,
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil, claim := claimWithRequests(claim0, nil,
requestWithPrioritizedList(req0, requestWithPrioritizedList(req0,
subRequest(subReq0, classA, 1), subRequest(subReq0, classA, 1),
@ -1909,7 +1975,7 @@ func TestAllocator(t *testing.T) {
"prioritized-list-disabled": { "prioritized-list-disabled": {
prioritizedList: false, prioritizedList: false,
claimsToAllocate: objects( claimsToAllocate: objects(
func() *resourceapi.ResourceClaim { func() wrapResourceClaim {
claim := claimWithRequests(claim0, nil, claim := claimWithRequests(claim0, nil,
requestWithPrioritizedList(req0, requestWithPrioritizedList(req0,
subRequest(subReq0, classA, 1), subRequest(subReq0, classA, 1),
@ -2038,6 +2104,168 @@ func TestAllocator(t *testing.T) {
deviceAllocationResult(req0SubReq1, driverA, pool1, device1, false), deviceAllocationResult(req0SubReq1, driverA, pool1, device1, false),
)}, )},
}, },
"tainted-two-devices": {
deviceTaints: true,
claimsToAllocate: objects(claim(claim0, req0, classA)),
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule),
device(device2, nil, nil).withTaints(taintNoExecute),
)),
node: node(node1, region1),
},
"tainted-one-device-two-taints": {
deviceTaints: true,
claimsToAllocate: objects(claim(claim0, req0, classA)),
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule, taintNoExecute),
)),
node: node(node1, region1),
},
"tainted-two-devices-tolerated": {
deviceTaints: true,
claimsToAllocate: objects(claim(claim0, req0, classA).withTolerations(tolerationNoExecute)),
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule),
device(device2, nil, nil).withTaints(taintNoExecute),
)),
node: node(node1, region1),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceAllocationResult(req0, driverA, pool1, device2, false), // Only second device's taints are tolerated.
)},
},
"tainted-one-device-two-taints-both-tolerated": {
deviceTaints: true,
claimsToAllocate: objects(claim(claim0, req0, classA).withTolerations(tolerationNoSchedule, tolerationNoExecute)),
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule, taintNoExecute),
)),
node: node(node1, region1),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceAllocationResult(req0, driverA, pool1, device1, false),
)},
},
"tainted-disabled": {
deviceTaints: false,
claimsToAllocate: objects(claim(claim0, req0, classA)),
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule, taintNoExecute),
)),
node: node(node1, region1),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceAllocationResult(req0, driverA, pool1, device1, false),
)},
},
"tainted-prioritized-list": {
deviceTaints: true,
prioritizedList: true,
claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0,
subRequest(subReq0, classB, 1),
subRequest(subReq1, classA, 1),
))),
classes: objects(class(classA, driverA), class(classB, driverB)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule),
)),
node: node(node1, region1),
},
"tainted-prioritized-list-disabled": {
deviceTaints: false,
prioritizedList: true,
claimsToAllocate: objects(claimWithRequests(claim0, nil, requestWithPrioritizedList(req0,
subRequest(subReq0, classB, 1),
subRequest(subReq1, classA, 1),
))),
classes: objects(class(classA, driverA), class(classB, driverB)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule),
)),
node: node(node1, region1),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceAllocationResult(req0SubReq1, driverA, pool1, device1, false),
)},
},
"tainted-admin-access": {
deviceTaints: true,
adminAccess: true,
claimsToAllocate: func() []wrapResourceClaim {
c := claim(claim0, req0, classA)
c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true)
return []wrapResourceClaim{c}
}(),
allocatedDevices: []DeviceID{
MakeDeviceID(driverA, pool1, device1),
MakeDeviceID(driverA, pool1, device2),
},
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule),
)),
node: node(node1, region1),
},
"tainted-admin-access-disabled": {
deviceTaints: false,
adminAccess: true,
claimsToAllocate: func() []wrapResourceClaim {
c := claim(claim0, req0, classA)
c.Spec.Devices.Requests[0].AdminAccess = ptr.To(true)
return []wrapResourceClaim{c}
}(),
allocatedDevices: []DeviceID{
MakeDeviceID(driverA, pool1, device1),
MakeDeviceID(driverA, pool1, device2),
},
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule),
)),
node: node(node1, region1),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceAllocationResult(req0, driverA, pool1, device1, true),
)},
},
"tainted-all-devices-single": {
deviceTaints: true,
claimsToAllocate: objects(claimWithRequests(claim0, nil, resourceapi.DeviceRequest{
Name: req0,
AllocationMode: resourceapi.DeviceAllocationModeAll,
DeviceClassName: classA,
})),
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule),
)),
node: node(node1, region1),
},
"tainted-all-devices-single-disabled": {
deviceTaints: false,
claimsToAllocate: objects(claimWithRequests(claim0, nil, resourceapi.DeviceRequest{
Name: req0,
AllocationMode: resourceapi.DeviceAllocationModeAll,
DeviceClassName: classA,
})),
classes: objects(class(classA, driverA)),
slices: objects(slice(slice1, node1, pool1, driverA,
device(device1, nil, nil).withTaints(taintNoSchedule),
)),
node: node(node1, region1),
expectResults: []any{allocationResult(
localNodeSelector(node1),
deviceAllocationResult(req0, driverA, pool1, device1, false),
)},
},
} }
for name, tc := range testcases { for name, tc := range testcases {
@ -2056,7 +2284,7 @@ func TestAllocator(t *testing.T) {
allocatedDevices := slices.Clone(tc.allocatedDevices) allocatedDevices := slices.Clone(tc.allocatedDevices)
slices := slices.Clone(tc.slices) slices := slices.Clone(tc.slices)
allocator, err := NewAllocator(ctx, tc.adminAccess, tc.prioritizedList, claimsToAllocate, sets.New(allocatedDevices...), classLister, slices, cel.NewCache(1)) allocator, err := NewAllocator(ctx, tc.adminAccess, tc.prioritizedList, tc.deviceTaints, unwrap(claimsToAllocate...), sets.New(allocatedDevices...), classLister, slices, cel.NewCache(1))
g.Expect(err).ToNot(gomega.HaveOccurred()) g.Expect(err).ToNot(gomega.HaveOccurred())
results, err := allocator.Allocate(ctx, tc.node) results, err := allocator.Allocate(ctx, tc.node)

View File

@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
resourcealphaapi "k8s.io/api/resource/v1alpha3"
resourceapi "k8s.io/api/resource/v1beta1" resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -35,6 +36,7 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/dynamic-resource-allocation/cel" "k8s.io/dynamic-resource-allocation/cel"
resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/dynamic-resource-allocation/structured" "k8s.io/dynamic-resource-allocation/structured"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources"
@ -277,7 +279,18 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0) informerFactory := informers.NewSharedInformerFactory(tCtx.Client(), 0)
claimInformer := informerFactory.Resource().V1beta1().ResourceClaims().Informer() claimInformer := informerFactory.Resource().V1beta1().ResourceClaims().Informer()
nodeLister := informerFactory.Core().V1().Nodes().Lister() nodeLister := informerFactory.Core().V1().Nodes().Lister()
draManager := dynamicresources.NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil), informerFactory) resourceSliceTrackerOpts := resourceslicetracker.Options{
EnableDeviceTaints: utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
SliceInformer: informerFactory.Resource().V1beta1().ResourceSlices(),
KubeClient: tCtx.Client(),
}
if resourceSliceTrackerOpts.EnableDeviceTaints {
resourceSliceTrackerOpts.TaintInformer = informerFactory.Resource().V1alpha3().DeviceTaintRules()
resourceSliceTrackerOpts.ClassInformer = informerFactory.Resource().V1beta1().DeviceClasses()
}
resourceSliceTracker, err := resourceslicetracker.StartTracker(tCtx, resourceSliceTrackerOpts)
tCtx.ExpectNoError(err, "start resource slice tracker")
draManager := dynamicresources.NewDRAManager(tCtx, assumecache.NewAssumeCache(tCtx.Logger(), claimInformer, "ResourceClaim", "", nil), resourceSliceTracker, informerFactory)
informerFactory.Start(tCtx.Done()) informerFactory.Start(tCtx.Done())
defer func() { defer func() {
tCtx.Cancel("allocResourceClaimsOp.run is shutting down") tCtx.Cancel("allocResourceClaimsOp.run is shutting down")
@ -290,13 +303,17 @@ func (op *allocResourceClaimsOp) run(tCtx ktesting.TContext) {
reflect.TypeOf(&resourceapi.ResourceSlice{}): true, reflect.TypeOf(&resourceapi.ResourceSlice{}): true,
reflect.TypeOf(&v1.Node{}): true, reflect.TypeOf(&v1.Node{}): true,
} }
if utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints) {
expectSyncedInformers[reflect.TypeOf(&resourcealphaapi.DeviceTaintRule{})] = true
}
require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers") require.Equal(tCtx, expectSyncedInformers, syncedInformers, "synced informers")
celCache := cel.NewCache(10) celCache := cel.NewCache(10)
// The set of nodes is assumed to be fixed at this point. // The set of nodes is assumed to be fixed at this point.
nodes, err := nodeLister.List(labels.Everything()) nodes, err := nodeLister.List(labels.Everything())
tCtx.ExpectNoError(err, "list nodes") tCtx.ExpectNoError(err, "list nodes")
slices, err := draManager.ResourceSlices().List() slices, err := draManager.ResourceSlices().ListWithDeviceTaintRules()
tCtx.ExpectNoError(err, "list slices") tCtx.ExpectNoError(err, "list slices")
// Allocate one claim at a time, picking nodes randomly. Each // Allocate one claim at a time, picking nodes randomly. Each
@ -321,7 +338,10 @@ claims:
} }
} }
allocator, err := structured.NewAllocator(tCtx, utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess), utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList), allocator, err := structured.NewAllocator(tCtx,
utilfeature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
utilfeature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList),
utilfeature.DefaultFeatureGate.Enabled(features.DRADeviceTaints),
[]*resourceapi.ResourceClaim{claim}, allocatedDevices, draManager.DeviceClasses(), slices, celCache) []*resourceapi.ResourceClaim{claim}, allocatedDevices, draManager.DeviceClasses(), slices, celCache)
tCtx.ExpectNoError(err, "create allocator") tCtx.ExpectNoError(err, "create allocator")

View File

@ -354,6 +354,67 @@
maxClaimsPerNode: 10 maxClaimsPerNode: 10
duration: 2s duration: 2s
# SchedulingWithResourceClaimTemplateToleration is a variant of SchedulingWithResourceClaimTemplate
# with a claim template that tolerates the taint defined in a DeviceTaintRule.
- name: SchedulingWithResourceClaimTemplateToleration
featureGates:
DynamicResourceAllocation: true
DRADeviceTaints: true
workloadTemplate:
- opcode: createNodes
countParam: $nodesWithoutDRA
- opcode: createAny
templatePath: templates/devicetaintrule.yaml
- opcode: createNodes
nodeTemplatePath: templates/node-with-dra-test-driver.yaml
countParam: $nodesWithDRA
- opcode: createResourceDriver
driverName: test-driver.cdi.k8s.io
nodes: scheduler-perf-dra-*
maxClaimsPerNodeParam: $maxClaimsPerNode
- opcode: createAny
templatePath: templates/deviceclass.yaml
- opcode: createAny
templatePath: templates/resourceclaimtemplate-toleration.yaml
namespace: init
- opcode: createPods
namespace: init
countParam: $initPods
podTemplatePath: templates/pod-with-claim-template.yaml
- opcode: createAny
templatePath: templates/resourceclaimtemplate-toleration.yaml
namespace: test
- opcode: createPods
namespace: test
countParam: $measurePods
podTemplatePath: templates/pod-with-claim-template.yaml
collectMetrics: true
workloads:
- name: fast
featureGates:
SchedulerQueueingHints: false
labels: [integration-test, short]
params:
# This testcase runs through all code paths without
# taking too long overall.
nodesWithDRA: 1
nodesWithoutDRA: 1
initPods: 0
measurePods: 10
maxClaimsPerNode: 10
- name: fast_QueueingHintsEnabled
featureGates:
SchedulerQueueingHints: true
labels: [integration-test, short]
params:
# This testcase runs through all code paths without
# taking too long overall.
nodesWithDRA: 1
nodesWithoutDRA: 1
initPods: 0
measurePods: 10
maxClaimsPerNode: 10
# SchedulingWithResourceClaimTemplate uses ResourceClaims # SchedulingWithResourceClaimTemplate uses ResourceClaims
# with deterministic names that are shared between pods. # with deterministic names that are shared between pods.
# There is a fixed ratio of 1:5 between claims and pods. # There is a fixed ratio of 1:5 between claims and pods.

View File

@ -0,0 +1,11 @@
apiVersion: resource.k8s.io/v1alpha3
kind: DeviceTaintRule
metadata:
name: taint-all-devices
spec:
# Empty selector -> all devices!
deviceSelector:
taint:
key: example.com/taint
value: tainted
effect: NoSchedule

View File

@ -0,0 +1,13 @@
apiVersion: resource.k8s.io/v1alpha3
kind: ResourceClaimTemplate
metadata:
name: test-claim-template
spec:
spec:
devices:
requests:
- name: req-0
deviceClassName: test-class
tolerations:
- key: example.com/taint
operator: Exists