scheduler: add dynamic resource allocation plugin

The plugin handles the interaction with ResourceClaims that are referenced by a
Pod.
This commit is contained in:
Patrick Ohly 2022-04-12 13:41:56 +02:00
parent 0133df3929
commit d2ff210c20
12 changed files with 1848 additions and 1 deletions

View File

@ -63,6 +63,23 @@ func applyFeatureGates(config *v1.Plugins) {
if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) {
config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1.Plugin{Name: names.SchedulingGates})
}
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
// This plugin should come before DefaultPreemption because if
// there is a problem with a Pod and PostFilter gets called to
// resolve the problem, it is better to first deallocate an
// idle ResourceClaim than it is to evict some Pod that might
// be doing useful work.
for i := range config.MultiPoint.Enabled {
if config.MultiPoint.Enabled[i].Name == names.DefaultPreemption {
extended := make([]v1.Plugin, 0, len(config.MultiPoint.Enabled)+1)
extended = append(extended, config.MultiPoint.Enabled[:i]...)
extended = append(extended, v1.Plugin{Name: names.DynamicResources})
extended = append(extended, config.MultiPoint.Enabled[i:]...)
config.MultiPoint.Enabled = extended
break
}
}
}
}
// mergePlugins merges the custom set into the given default one, handling disabled sets.

View File

@ -97,6 +97,39 @@ func TestApplyFeatureGates(t *testing.T) {
},
},
},
{
name: "Feature gate DynamicResourceAllocation enabled",
features: map[featuregate.Feature]bool{
features.DynamicResourceAllocation: true,
},
wantConfig: &v1.Plugins{
MultiPoint: v1.PluginSet{
Enabled: []v1.Plugin{
{Name: names.PrioritySort},
{Name: names.NodeUnschedulable},
{Name: names.NodeName},
{Name: names.TaintToleration, Weight: pointer.Int32(3)},
{Name: names.NodeAffinity, Weight: pointer.Int32(2)},
{Name: names.NodePorts},
{Name: names.NodeResourcesFit, Weight: pointer.Int32(1)},
{Name: names.VolumeRestrictions},
{Name: names.EBSLimits},
{Name: names.GCEPDLimits},
{Name: names.NodeVolumeLimits},
{Name: names.AzureDiskLimits},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread, Weight: pointer.Int32(2)},
{Name: names.InterPodAffinity, Weight: pointer.Int32(2)},
{Name: names.DynamicResources},
{Name: names.DefaultPreemption},
{Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)},
{Name: names.ImageLocality, Weight: pointer.Int32(1)},
{Name: names.DefaultBinder},
},
},
},
},
}
for _, test := range tests {

View File

@ -25,12 +25,14 @@ import (
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
corev1nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
@ -376,6 +378,18 @@ func addAllEventHandlers(
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
)
case framework.PodScheduling:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
_, _ = informerFactory.Resource().V1alpha1().PodSchedulings().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PodScheduling, "PodScheduling"),
)
}
case framework.ResourceClaim:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
_, _ = informerFactory.Resource().V1alpha1().ResourceClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
)
}
case framework.StorageClass:
if at&framework.Add != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(

View File

@ -0,0 +1,8 @@
# See the OWNERS docs at https://go.k8s.io/owners
reviewers:
- klueska
- pohly
- bart0sh
labels:
- sig/node

View File

@ -0,0 +1,804 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicresources
import (
"context"
"errors"
"fmt"
"sort"
"sync"
v1 "k8s.io/api/core/v1"
resourcev1alpha1 "k8s.io/api/resource/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
resourcev1alpha1listers "k8s.io/client-go/listers/resource/v1alpha1"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)
const (
// Name is the name of the plugin used in Registry and configurations.
Name = names.DynamicResources
stateKey framework.StateKey = Name
)
// The state is initialized in PreFilter phase. Because we save the pointer in
// framework.CycleState, in the later phases we don't need to call Write method
// to update the value
type stateData struct {
// A copy of all claims for the Pod (i.e. 1:1 match with
// pod.Spec.ResourceClaims), initially with the status from the start
// of the scheduling cycle. Each claim instance is read-only because it
// might come from the informer cache. The instances get replaced when
// the plugin itself successfully does an Update.
//
// Empty if the Pod has no claims.
claims []*resourcev1alpha1.ResourceClaim
// The AvailableOnNodes node filters of the claims converted from the
// v1 API to nodeaffinity.NodeSelector by PreFilter for repeated
// evaluation in Filter. Nil for claims which don't have it.
availableOnNodes []*nodeaffinity.NodeSelector
// The indices of all claims that:
// - are allocated
// - use delayed allocation
// - were not available on at least one node
//
// Set in parallel during Filter, so write access there must be
// protected by the mutex. Used by PostFilter.
unavailableClaims sets.Int
// A pointer to the PodScheduling object for the pod, if one exists.
// Gets set on demand.
//
// Conceptually, this object belongs into the scheduler framework
// where it might get shared by different plugins. But in practice,
// it is currently only used by dynamic provisioning and thus
// managed entirely here.
podScheduling *resourcev1alpha1.PodScheduling
// podSchedulingDirty is true if the current copy was locally modified.
podSchedulingDirty bool
mutex sync.Mutex
}
func (d *stateData) Clone() framework.StateData {
return d
}
func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes.Interface, index int, claim *resourcev1alpha1.ResourceClaim) error {
// TODO (#113700): replace with patch operation. Beware that patching must only succeed if the
// object has not been modified in parallel by someone else.
claim, err := clientset.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
// TODO: metric for update results, with the operation ("set selected
// node", "set PotentialNodes", etc.) as one dimension.
if err != nil {
return fmt.Errorf("update resource claim: %w", err)
}
// Remember the new instance. This is relevant when the plugin must
// update the same claim multiple times (for example, first reserve
// the claim, then later remove the reservation), because otherwise the second
// update would fail with a "was modified" error.
d.claims[index] = claim
return nil
}
// initializePodScheduling can be called concurrently. It returns an existing PodScheduling
// object if there is one already, retrieves one if not, or as a last resort creates
// one from scratch.
func (d *stateData) initializePodScheduling(ctx context.Context, pod *v1.Pod, podSchedulingLister resourcev1alpha1listers.PodSchedulingLister) (*resourcev1alpha1.PodScheduling, error) {
// TODO (#113701): check if this mutex locking can be avoided by calling initializePodScheduling during PreFilter.
d.mutex.Lock()
defer d.mutex.Unlock()
if d.podScheduling != nil {
return d.podScheduling, nil
}
podScheduling, err := podSchedulingLister.PodSchedulings(pod.Namespace).Get(pod.Name)
switch {
case apierrors.IsNotFound(err):
controller := true
podScheduling = &resourcev1alpha1.PodScheduling{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: &controller,
},
},
},
}
err = nil
case err != nil:
return nil, err
default:
// We have an object, but it might be obsolete.
if !metav1.IsControlledBy(podScheduling, pod) {
return nil, fmt.Errorf("PodScheduling object with UID %s is not owned by Pod %s/%s", podScheduling.UID, pod.Namespace, pod.Name)
}
}
d.podScheduling = podScheduling
return podScheduling, err
}
// publishPodScheduling creates or updates the PodScheduling object.
func (d *stateData) publishPodScheduling(ctx context.Context, clientset kubernetes.Interface, podScheduling *resourcev1alpha1.PodScheduling) error {
d.mutex.Lock()
defer d.mutex.Unlock()
var err error
logger := klog.FromContext(ctx)
msg := "Updating PodScheduling"
if podScheduling.UID == "" {
msg = "Creating PodScheduling"
}
if loggerV := logger.V(6); loggerV.Enabled() {
// At a high enough log level, dump the entire object.
loggerV.Info(msg, "podschedulingDump", podScheduling)
} else {
logger.V(5).Info(msg, "podscheduling", klog.KObj(podScheduling))
}
if podScheduling.UID == "" {
podScheduling, err = clientset.ResourceV1alpha1().PodSchedulings(podScheduling.Namespace).Create(ctx, podScheduling, metav1.CreateOptions{})
} else {
// TODO (#113700): patch here to avoid racing with drivers which update the status.
podScheduling, err = clientset.ResourceV1alpha1().PodSchedulings(podScheduling.Namespace).Update(ctx, podScheduling, metav1.UpdateOptions{})
}
if err != nil {
return err
}
d.podScheduling = podScheduling
d.podSchedulingDirty = false
return nil
}
// storePodScheduling replaces the pod scheduling object in the state.
func (d *stateData) storePodScheduling(podScheduling *resourcev1alpha1.PodScheduling) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.podScheduling = podScheduling
d.podSchedulingDirty = true
}
func statusForClaim(podScheduling *resourcev1alpha1.PodScheduling, podClaimName string) *resourcev1alpha1.ResourceClaimSchedulingStatus {
for _, status := range podScheduling.Status.ResourceClaims {
if status.Name == podClaimName {
return &status
}
}
return nil
}
// dynamicResources is a plugin that ensures that ResourceClaims are allocated.
type dynamicResources struct {
enabled bool
clientset kubernetes.Interface
claimLister resourcev1alpha1listers.ResourceClaimLister
classLister resourcev1alpha1listers.ResourceClassLister
podSchedulingLister resourcev1alpha1listers.PodSchedulingLister
}
// New initializes a new plugin and returns it.
func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
if !fts.EnableDynamicResourceAllocation {
// Disabled, won't do anything.
return &dynamicResources{}, nil
}
return &dynamicResources{
enabled: true,
clientset: fh.ClientSet(),
claimLister: fh.SharedInformerFactory().Resource().V1alpha1().ResourceClaims().Lister(),
classLister: fh.SharedInformerFactory().Resource().V1alpha1().ResourceClasses().Lister(),
podSchedulingLister: fh.SharedInformerFactory().Resource().V1alpha1().PodSchedulings().Lister(),
}, nil
}
var _ framework.PreFilterPlugin = &dynamicResources{}
var _ framework.FilterPlugin = &dynamicResources{}
var _ framework.PostFilterPlugin = &dynamicResources{}
var _ framework.PreScorePlugin = &dynamicResources{}
var _ framework.ReservePlugin = &dynamicResources{}
var _ framework.EnqueueExtensions = &dynamicResources{}
var _ framework.PostBindPlugin = &dynamicResources{}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *dynamicResources) Name() string {
return Name
}
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *dynamicResources) EventsToRegister() []framework.ClusterEvent {
if !pl.enabled {
return nil
}
events := []framework.ClusterEvent{
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update},
// When a driver has provided additional information, a pod waiting for that information
// may be schedulable.
// TODO (#113702): can we change this so that such an event does not trigger *all* pods?
// Yes: https://github.com/kubernetes/kubernetes/blob/abcbaed0784baf5ed2382aae9705a8918f2daa18/pkg/scheduler/eventhandlers.go#L70
{Resource: framework.PodScheduling, ActionType: framework.Add | framework.Update},
// A resource might depend on node labels for topology filtering.
// A new or updated node may make pods schedulable.
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel},
}
return events
}
// podResourceClaims returns the ResourceClaims for all pod.Spec.PodResourceClaims.
func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha1.ResourceClaim, error) {
claims := make([]*resourcev1alpha1.ResourceClaim, 0, len(pod.Spec.ResourceClaims))
for _, resource := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &resource)
isEphemeral := resource.Source.ResourceClaimTemplateName != nil
claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(claimName)
if err != nil {
// The error usually has already enough context ("resourcevolumeclaim "myclaim" not found"),
// but we can do better for generic ephemeral inline volumes where that situation
// is normal directly after creating a pod.
if isEphemeral && apierrors.IsNotFound(err) {
err = fmt.Errorf("waiting for dynamic resource controller to create the resourceclaim %q", claimName)
}
return nil, err
}
if claim.DeletionTimestamp != nil {
return nil, fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
}
if isEphemeral {
if err := resourceclaim.IsForPod(pod, claim); err != nil {
return nil, err
}
}
// We store the pointer as returned by the lister. The
// assumption is that if a claim gets modified while our code
// runs, the cache will store a new pointer, not mutate the
// existing object that we point to here.
claims = append(claims, claim)
}
return claims, nil
}
// PreFilter invoked at the prefilter extension point to check if pod has all
// immediate claims bound. UnschedulableAndUnresolvable is returned if
// the pod cannot be scheduled at the moment on any node.
func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
if !pl.enabled {
return nil, nil
}
logger := klog.FromContext(ctx)
// If the pod does not reference any claim, we don't need to do
// anything for it. We just initialize an empty state to record that
// observation for the other functions. This gets updated below
// if we get that far.
s := &stateData{}
state.Write(stateKey, s)
claims, err := pl.podResourceClaims(pod)
if err != nil {
return nil, statusUnschedulable(logger, err.Error())
}
logger.V(5).Info("pod resource claims", "pod", klog.KObj(pod), "resourceclaims", klog.KObjs(claims))
// If the pod does not reference any claim, we don't need to do
// anything for it.
if len(claims) == 0 {
return nil, nil
}
s.availableOnNodes = make([]*nodeaffinity.NodeSelector, len(claims))
for index, claim := range claims {
if claim.Spec.AllocationMode == resourcev1alpha1.AllocationModeImmediate &&
claim.Status.Allocation == nil {
// This will get resolved by the resource driver.
return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
}
if claim.Status.DeallocationRequested {
// This will get resolved by the resource driver.
return nil, statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
}
if claim.Status.Allocation != nil &&
!resourceclaim.CanBeReserved(claim) &&
!resourceclaim.IsReservedForPod(pod, claim) {
// Resource is in use. The pod has to wait.
return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
}
if claim.Status.Allocation != nil &&
claim.Status.Allocation.AvailableOnNodes != nil {
nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes)
if err != nil {
return nil, statusError(logger, err)
}
s.availableOnNodes[index] = nodeSelector
}
}
s.claims = claims
state.Write(stateKey, s)
return nil, nil
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *dynamicResources) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
func getStateData(cs *framework.CycleState) (*stateData, error) {
state, err := cs.Read(stateKey)
if err != nil {
return nil, err
}
s, ok := state.(*stateData)
if !ok {
return nil, errors.New("unable to convert state into stateData")
}
return s, nil
}
// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the resources it requests,
// for both allocated and unallocated claims.
//
// For claims that are bound, then it checks that the node affinity is
// satisfied by the given node.
//
// For claims that are unbound, it checks whether the claim might get allocated
// for the node.
func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if !pl.enabled {
return nil
}
state, err := getStateData(cs)
if err != nil {
return statusError(klog.FromContext(ctx), err)
}
if len(state.claims) == 0 {
return nil
}
logger := klog.FromContext(ctx)
node := nodeInfo.Node()
var unavailableClaims []int
for index, claim := range state.claims {
logger.V(10).Info("filtering based on resource claims of the pod", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
switch {
case claim.Status.Allocation != nil:
if nodeSelector := state.availableOnNodes[index]; nodeSelector != nil {
if !nodeSelector.Match(node) {
logger.V(5).Info("AvailableOnNodes does not match", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
unavailableClaims = append(unavailableClaims, index)
}
}
case claim.Status.DeallocationRequested:
// We shouldn't get here. PreFilter already checked this.
return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
case claim.Spec.AllocationMode == resourcev1alpha1.AllocationModeWaitForFirstConsumer:
// The ResourceClass might have a node filter. This is
// useful for trimming the initial set of potential
// nodes before we ask the driver(s) for information
// about the specific pod.
class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
if err != nil {
// If the class does not exist, then allocation cannot proceed.
return statusError(logger, fmt.Errorf("look up resource class: %v", err))
}
if class.SuitableNodes != nil {
// TODO (#113700): parse class.SuitableNodes once in PreFilter, reuse result.
matches, err := corev1helpers.MatchNodeSelectorTerms(node, class.SuitableNodes)
if err != nil {
return statusError(logger, fmt.Errorf("potential node filter: %v", err))
}
if !matches {
return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclass", klog.KObj(class))
}
}
// Now we need information from drivers.
podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister)
if err != nil {
return statusError(logger, err)
}
status := statusForClaim(podScheduling, pod.Spec.ResourceClaims[index].Name)
if status != nil {
for _, unsuitableNode := range status.UnsuitableNodes {
if node.Name == unsuitableNode {
return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes)
}
}
}
default:
// This should have been delayed allocation. Immediate
// allocation was already checked for in PreFilter.
return statusError(logger, fmt.Errorf("internal error, unexpected allocation mode %v", claim.Spec.AllocationMode))
}
}
if len(unavailableClaims) > 0 {
state.mutex.Lock()
defer state.mutex.Unlock()
if state.unavailableClaims == nil {
state.unavailableClaims = sets.NewInt()
}
for index := range unavailableClaims {
claim := state.claims[index]
// Deallocation makes more sense for claims with
// delayed allocation. Claims with immediate allocation
// would just get allocated again for a random node,
// which is unlikely to help the pod.
if claim.Spec.AllocationMode == resourcev1alpha1.AllocationModeWaitForFirstConsumer {
state.unavailableClaims.Insert(unavailableClaims...)
}
}
return statusUnschedulable(logger, "resourceclaim not available on the node", "pod", klog.KObj(pod))
}
return nil
}
// PostFilter checks whether there are allocated claims that could get
// deallocated to help get the Pod schedulable. If yes, it picks one and
// requests its deallocation. This only gets called when filtering found no
// suitable node.
func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
if !pl.enabled {
return nil, framework.NewStatus(framework.Unschedulable, "plugin disabled")
}
logger := klog.FromContext(ctx)
state, err := getStateData(cs)
if err != nil {
return nil, statusError(logger, err)
}
if len(state.claims) == 0 {
return nil, framework.NewStatus(framework.Unschedulable, "no new claims to deallocate")
}
// Iterating over a map is random. This is intentional here, we want to
// pick one claim randomly because there is no better heuristic.
for index := range state.unavailableClaims {
claim := state.claims[index]
if len(claim.Status.ReservedFor) == 0 ||
len(claim.Status.ReservedFor) == 1 && claim.Status.ReservedFor[0].UID == pod.UID {
claim := state.claims[index].DeepCopy()
claim.Status.DeallocationRequested = true
claim.Status.ReservedFor = nil
logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil {
return nil, statusError(logger, err)
}
return nil, nil
}
}
return nil, framework.NewStatus(framework.Unschedulable, "still not schedulable")
}
// PreScore is passed a list of all nodes that would fit the pod. Not all
// claims are necessarily allocated yet, so here we can set the SuitableNodes
// field for those which are pending.
func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
if !pl.enabled {
return nil
}
state, err := getStateData(cs)
if err != nil {
return statusError(klog.FromContext(ctx), err)
}
if len(state.claims) == 0 {
return nil
}
logger := klog.FromContext(ctx)
podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister)
if err != nil {
return statusError(logger, err)
}
pending := false
for _, claim := range state.claims {
if claim.Status.Allocation == nil {
pending = true
}
}
if pending && !haveAllNodes(podScheduling.Spec.PotentialNodes, nodes) {
// Remember the potential nodes. The object will get created or
// updated in Reserve. This is both an optimization and
// covers the case that PreScore doesn't get called when there
// is only a single node.
logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
podScheduling = podScheduling.DeepCopy()
numNodes := len(nodes)
if numNodes > resourcev1alpha1.PodSchedulingNodeListMaxSize {
numNodes = resourcev1alpha1.PodSchedulingNodeListMaxSize
}
podScheduling.Spec.PotentialNodes = make([]string, 0, numNodes)
if numNodes == len(nodes) {
// Copy all node names.
for _, node := range nodes {
podScheduling.Spec.PotentialNodes = append(podScheduling.Spec.PotentialNodes, node.Name)
}
} else {
// Select a random subset of the nodes to comply with
// the PotentialNodes length limit. Randomization is
// done for us by Go which iterates over map entries
// randomly.
nodeNames := map[string]struct{}{}
for _, node := range nodes {
nodeNames[node.Name] = struct{}{}
}
for nodeName := range nodeNames {
if len(podScheduling.Spec.PotentialNodes) >= resourcev1alpha1.PodSchedulingNodeListMaxSize {
break
}
podScheduling.Spec.PotentialNodes = append(podScheduling.Spec.PotentialNodes, nodeName)
}
}
sort.Strings(podScheduling.Spec.PotentialNodes)
state.storePodScheduling(podScheduling)
}
logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", nodes)
return nil
}
func haveAllNodes(nodeNames []string, nodes []*v1.Node) bool {
for _, node := range nodes {
if !haveNode(nodeNames, node.Name) {
return false
}
}
return true
}
func haveNode(nodeNames []string, nodeName string) bool {
for _, n := range nodeNames {
if n == nodeName {
return true
}
}
return false
}
// Reserve reserves claims for the pod.
func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
if !pl.enabled {
return nil
}
state, err := getStateData(cs)
if err != nil {
return statusError(klog.FromContext(ctx), err)
}
if len(state.claims) == 0 {
return nil
}
numDelayedAllocationPending := 0
numClaimsWithStatusInfo := 0
logger := klog.FromContext(ctx)
podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister)
if err != nil {
return statusError(logger, err)
}
for index, claim := range state.claims {
if claim.Status.Allocation != nil {
// Allocated, but perhaps not reserved yet.
if resourceclaim.IsReservedForPod(pod, claim) {
logger.V(5).Info("is reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
continue
}
claim := claim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha1.ResourceClaimConsumerReference{
Resource: "pods",
Name: pod.Name,
UID: pod.UID,
})
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
_, err := pl.clientset.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
// TODO: metric for update errors.
if err != nil {
return statusError(logger, err)
}
// If we get here, we know that reserving the claim for
// the pod worked and we can proceed with scheduling
// it.
} else {
// Must be delayed allocation.
numDelayedAllocationPending++
// Did the driver provide information that steered node
// selection towards a node that it can support?
if statusForClaim(podScheduling, pod.Spec.ResourceClaims[index].Name) != nil {
numClaimsWithStatusInfo++
}
}
}
if numDelayedAllocationPending == 0 {
// Nothing left to do.
return nil
}
podSchedulingDirty := state.podSchedulingDirty
if len(podScheduling.Spec.PotentialNodes) == 0 {
// PreScore was not called, probably because there was
// only one candidate. We need to ask whether that
// node is suitable, otherwise the scheduler will pick
// it forever even when it cannot satisfy the claim.
podScheduling = podScheduling.DeepCopy()
podScheduling.Spec.PotentialNodes = []string{nodeName}
logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
podSchedulingDirty = true
}
// When there is only one pending resource, we can go ahead with
// requesting allocation even when we don't have the information from
// the driver yet. Otherwise we wait for information before blindly
// making a decision that might have to be reversed later.
if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending {
podScheduling = podScheduling.DeepCopy()
// TODO: can we increase the chance that the scheduler picks
// the same node as before when allocation is on-going,
// assuming that that node still fits the pod? Picking a
// different node may lead to some claims being allocated for
// one node and others for another, which then would have to be
// resolved with deallocation.
podScheduling.Spec.SelectedNode = nodeName
logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
if err := state.publishPodScheduling(ctx, pl.clientset, podScheduling); err != nil {
return statusError(logger, err)
}
return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
}
// May have been modified earlier in PreScore or above.
if podSchedulingDirty {
if err := state.publishPodScheduling(ctx, pl.clientset, podScheduling); err != nil {
return statusError(logger, err)
}
}
// More than one pending claim and not enough information about all of them.
//
// TODO: can or should we ensure that scheduling gets aborted while
// waiting for resources *before* triggering delayed volume
// provisioning? On the one hand, volume provisioning is currently
// irreversible, so it better should come last. On the other hand,
// triggering both in parallel might be faster.
return statusUnschedulable(logger, "waiting for resource driver to provide information", "pod", klog.KObj(pod))
}
// Unreserve clears the ReservedFor field for all claims.
// It's idempotent, and does nothing if no state found for the given pod.
func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
if !pl.enabled {
return
}
state, err := getStateData(cs)
if err != nil {
return
}
if len(state.claims) == 0 {
return
}
logger := klog.FromContext(ctx)
for index, claim := range state.claims {
if claim.Status.Allocation != nil &&
resourceclaim.IsReservedForPod(pod, claim) {
// Remove pod from ReservedFor.
claim := claim.DeepCopy()
reservedFor := make([]resourcev1alpha1.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor)-1)
for _, reserved := range claim.Status.ReservedFor {
// TODO: can UID be assumed to be unique all resources or do we also need to compare Group/Version/Resource?
if reserved.UID != pod.UID {
reservedFor = append(reservedFor, reserved)
}
}
claim.Status.ReservedFor = reservedFor
logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim))
if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil {
// We will get here again when pod scheduling
// is retried.
logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim))
}
}
}
}
// PostBind is called after a pod is successfully bound to a node. Now we are
// sure that a PodScheduling object, if it exists, is definitely not going to
// be needed anymore and can delete it. This is a one-shot thing, there won't
// be any retries. This is okay because it should usually work and in those
// cases where it doesn't, the garbage collector will eventually clean up.
func (pl *dynamicResources) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
if !pl.enabled {
return
}
state, err := getStateData(cs)
if err != nil {
return
}
if len(state.claims) == 0 {
return
}
// We cannot know for sure whether the PodScheduling object exists. We
// might have created it in the previous pod scheduling cycle and not
// have it in our informer cache yet. Let's try to delete, just to be
// on the safe side.
logger := klog.FromContext(ctx)
err = pl.clientset.ResourceV1alpha1().PodSchedulings(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
switch {
case apierrors.IsNotFound(err):
logger.V(5).Info("no PodScheduling object to delete")
case err != nil:
logger.Error(err, "delete PodScheduling")
default:
logger.V(5).Info("PodScheduling object deleted")
}
}
// statusUnschedulable ensures that there is a log message associated with the
// line where the status originated.
func statusUnschedulable(logger klog.Logger, reason string, kv ...interface{}) *framework.Status {
if loggerV := logger.V(5); loggerV.Enabled() {
helper, loggerV := loggerV.WithCallStackHelper()
helper()
kv = append(kv, "reason", reason)
// nolint: logcheck // warns because it cannot check key/values
loggerV.Info("pod unschedulable", kv...)
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, reason)
}
// statusError ensures that there is a log message associated with the
// line where the error originated.
func statusError(logger klog.Logger, err error, kv ...interface{}) *framework.Status {
if loggerV := logger.V(5); loggerV.Enabled() {
helper, loggerV := loggerV.WithCallStackHelper()
helper()
// nolint: logcheck // warns because it cannot check key/values
loggerV.Error(err, "dynamic resource plugin failed", kv...)
}
return framework.AsStatus(err)
}

View File

@ -0,0 +1,789 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicresources
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
resourcev1alpha1 "k8s.io/api/resource/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
cgotesting "k8s.io/client-go/testing"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
var (
podKind = v1.SchemeGroupVersion.WithKind("Pod")
podName = "my-pod"
podUID = "1234"
resourceName = "my-resource"
resourceName2 = resourceName + "-2"
claimName = podName + "-" + resourceName
claimName2 = podName + "-" + resourceName + "-2"
className = "my-resource-class"
namespace = "default"
resourceClass = &resourcev1alpha1.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
DriverName: "some-driver",
}
podWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}).
Obj()
otherPodWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID + "-II").
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}).
Obj()
podWithClaimTemplate = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimTemplateName: &claimName}}).
Obj()
podWithTwoClaimNames = st.MakePod().Name(podName).Namespace(namespace).
UID(podUID).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}).
PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, Source: v1.ClaimSource{ResourceClaimName: &claimName2}}).
Obj()
workerNode = &st.MakeNode().Name("worker").Label("nodename", "worker").Node
claim = st.MakeResourceClaim().
Name(claimName).
Namespace(namespace).
ResourceClassName(className).
Obj()
pendingImmediateClaim = st.FromResourceClaim(claim).
AllocationMode(resourcev1alpha1.AllocationModeImmediate).
Obj()
pendingDelayedClaim = st.FromResourceClaim(claim).
AllocationMode(resourcev1alpha1.AllocationModeWaitForFirstConsumer).
Obj()
pendingDelayedClaim2 = st.FromResourceClaim(pendingDelayedClaim).
Name(claimName2).
Obj()
deallocatingClaim = st.FromResourceClaim(pendingImmediateClaim).
Allocation(&resourcev1alpha1.AllocationResult{}).
DeallocationRequested(true).
Obj()
inUseClaim = st.FromResourceClaim(pendingImmediateClaim).
Allocation(&resourcev1alpha1.AllocationResult{}).
ReservedFor(resourcev1alpha1.ResourceClaimConsumerReference{UID: types.UID(podUID)}).
Obj()
allocatedClaim = st.FromResourceClaim(pendingDelayedClaim).
OwnerReference(podName, podUID, podKind).
Allocation(&resourcev1alpha1.AllocationResult{}).
Obj()
allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim).
Allocation(&resourcev1alpha1.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("no-such-label", []string{"no-such-value"}).Obj()}).
Obj()
allocatedImmediateClaimWithWrongTopology = st.FromResourceClaim(allocatedDelayedClaimWithWrongTopology).
AllocationMode(resourcev1alpha1.AllocationModeImmediate).
Obj()
allocatedClaimWithGoodTopology = st.FromResourceClaim(allocatedClaim).
Allocation(&resourcev1alpha1.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("nodename", []string{"worker"}).Obj()}).
Obj()
otherClaim = st.MakeResourceClaim().
Name("not-my-claim").
Namespace(namespace).
ResourceClassName(className).
Obj()
scheduling = st.MakePodScheduling().Name(podName).Namespace(namespace).
OwnerReference(podName, podUID, podKind).
Obj()
schedulingPotential = st.FromPodScheduling(scheduling).
PotentialNodes(workerNode.Name).
Obj()
schedulingSelectedPotential = st.FromPodScheduling(schedulingPotential).
SelectedNode(workerNode.Name).
Obj()
schedulingInfo = st.FromPodScheduling(schedulingPotential).
ResourceClaims(resourcev1alpha1.ResourceClaimSchedulingStatus{Name: resourceName},
resourcev1alpha1.ResourceClaimSchedulingStatus{Name: resourceName2}).
Obj()
)
// result defines the expected outcome of some operation. It covers
// operation's status and the state of the world (= objects).
type result struct {
status *framework.Status
// changes contains a mapping of name to an update function for
// the corresponding object. These functions apply exactly the expected
// changes to a copy of the object as it existed before the operation.
changes change
// added contains objects created by the operation.
added []metav1.Object
// removed contains objects deleted by the operation.
removed []metav1.Object
}
// change contains functions for modifying objects of a certain type. These
// functions will get called for all objects of that type. If they needs to
// make changes only to a particular instance, then it must check the name.
type change struct {
scheduling func(*resourcev1alpha1.PodScheduling) *resourcev1alpha1.PodScheduling
claim func(*resourcev1alpha1.ResourceClaim) *resourcev1alpha1.ResourceClaim
}
type perNodeResult map[string]result
func (p perNodeResult) forNode(nodeName string) result {
if p == nil {
return result{}
}
return p[nodeName]
}
type want struct {
preFilterResult *framework.PreFilterResult
prefilter result
filter perNodeResult
prescore result
reserve result
unreserve result
postbind result
postFilterResult *framework.PostFilterResult
postfilter result
}
// prepare contains changes for objects in the API server.
// Those changes are applied before running the steps. This can
// be used to simulate concurrent changes by some other entities
// like a resource driver.
type prepare struct {
filter change
prescore change
reserve change
unreserve change
postbind change
postfilter change
}
func TestPlugin(t *testing.T) {
testcases := map[string]struct {
nodes []*v1.Node // default if unset is workerNode
pod *v1.Pod
claims []*resourcev1alpha1.ResourceClaim
classes []*resourcev1alpha1.ResourceClass
schedulings []*resourcev1alpha1.PodScheduling
prepare prepare
want want
}{
"empty": {
pod: st.MakePod().Name("foo").Namespace("default").Obj(),
},
"claim-reference": {
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{allocatedClaim, otherClaim},
},
"claim-template": {
pod: podWithClaimTemplate,
claims: []*resourcev1alpha1.ResourceClaim{allocatedClaim, otherClaim},
},
"missing-claim": {
pod: podWithClaimTemplate,
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `waiting for dynamic resource controller to create the resourceclaim "my-pod-my-resource"`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"waiting-for-immediate-allocation": {
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{pendingImmediateClaim},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `unallocated immediate resourceclaim`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"waiting-for-deallocation": {
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{deallocatingClaim},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim must be reallocated`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"delayed-allocation-missing-class": {
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.AsStatus(fmt.Errorf(`look up resource class: resourceclass.resource.k8s.io "%s" not found`, className)),
},
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"delayed-allocation-scheduling-select-immediately": {
// Create the PodScheduling object, ask for information
// and select a node.
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim},
classes: []*resourcev1alpha1.ResourceClass{resourceClass},
want: want{
reserve: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `waiting for resource driver to allocate resource`),
added: []metav1.Object{schedulingSelectedPotential},
},
},
},
"delayed-allocation-scheduling-ask": {
// Create the PodScheduling object, ask for
// information, but do not select a node because
// there are multiple claims.
pod: podWithTwoClaimNames,
claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim, pendingDelayedClaim2},
classes: []*resourcev1alpha1.ResourceClass{resourceClass},
want: want{
reserve: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `waiting for resource driver to provide information`),
added: []metav1.Object{schedulingPotential},
},
},
},
"delayed-allocation-scheduling-finish": {
// Use the populated PodScheduling object to select a
// node.
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim},
schedulings: []*resourcev1alpha1.PodScheduling{schedulingInfo},
classes: []*resourcev1alpha1.ResourceClass{resourceClass},
want: want{
reserve: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `waiting for resource driver to allocate resource`),
changes: change{
scheduling: func(in *resourcev1alpha1.PodScheduling) *resourcev1alpha1.PodScheduling {
return st.FromPodScheduling(in).
SelectedNode(workerNode.Name).
Obj()
},
},
},
},
},
"delayed-allocation-scheduling-finish-concurrent-label-update": {
// Use the populated PodScheduling object to select a
// node.
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim},
schedulings: []*resourcev1alpha1.PodScheduling{schedulingInfo},
classes: []*resourcev1alpha1.ResourceClass{resourceClass},
prepare: prepare{
reserve: change{
scheduling: func(in *resourcev1alpha1.PodScheduling) *resourcev1alpha1.PodScheduling {
// This does not actually conflict with setting the
// selected node, but because the plugin is not using
// patching yet, Update nonetheless fails.
return st.FromPodScheduling(in).
Label("hello", "world").
Obj()
},
},
},
want: want{
reserve: result{
status: framework.AsStatus(errors.New(`ResourceVersion must match the object that gets updated`)),
},
},
},
"delayed-allocation-scheduling-completed": {
// Remove PodScheduling object once the pod is scheduled.
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{allocatedClaim},
schedulings: []*resourcev1alpha1.PodScheduling{schedulingInfo},
classes: []*resourcev1alpha1.ResourceClass{resourceClass},
want: want{
reserve: result{
changes: change{
claim: func(in *resourcev1alpha1.ResourceClaim) *resourcev1alpha1.ResourceClaim {
return st.FromResourceClaim(in).
ReservedFor(resourcev1alpha1.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
Obj()
},
},
},
postbind: result{
removed: []metav1.Object{schedulingInfo},
},
},
},
"in-use-by-other": {
nodes: []*v1.Node{},
pod: otherPodWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{inUseClaim},
classes: []*resourcev1alpha1.ResourceClass{},
schedulings: []*resourcev1alpha1.PodScheduling{},
prepare: prepare{},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim in use`),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},
"wrong-topology-delayed-allocation": {
// PostFilter tries to get the pod scheduleable by
// deallocating the claim.
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{allocatedDelayedClaimWithWrongTopology},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`),
},
},
postfilter: result{
// Claims with delayed allocation get deallocated.
changes: change{
claim: func(in *resourcev1alpha1.ResourceClaim) *resourcev1alpha1.ResourceClaim {
return st.FromResourceClaim(in).
DeallocationRequested(true).
Obj()
},
},
},
},
},
"wrong-topology-immediate-allocation": {
// PostFilter tries to get the pod scheduleable by
// deallocating the claim.
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{allocatedImmediateClaimWithWrongTopology},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`),
},
},
postfilter: result{
// Claims with immediate allocation don't. They would just get allocated again right
// away, without considering the needs of the pod.
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
},
},
},
"good-topology": {
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{allocatedClaimWithGoodTopology},
want: want{
reserve: result{
changes: change{
claim: func(in *resourcev1alpha1.ResourceClaim) *resourcev1alpha1.ResourceClaim {
return st.FromResourceClaim(in).
ReservedFor(resourcev1alpha1.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
Obj()
},
},
},
},
},
"reserved-okay": {
pod: podWithClaimName,
claims: []*resourcev1alpha1.ResourceClaim{inUseClaim},
},
}
for name, tc := range testcases {
// We can run in parallel because logging is per-test.
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
nodes := tc.nodes
if nodes == nil {
nodes = []*v1.Node{workerNode}
}
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings)
initialObjects := testCtx.listAll(t)
result, status := testCtx.p.PreFilter(testCtx.ctx, testCtx.state, tc.pod)
t.Run("prefilter", func(t *testing.T) {
assert.Equal(t, tc.want.preFilterResult, result)
testCtx.verify(t, tc.want.prefilter, initialObjects, result, status)
})
unschedulable := status.Code() != framework.Success
var potentialNodes []*v1.Node
initialObjects = testCtx.listAll(t)
testCtx.updateAPIServer(t, initialObjects, tc.prepare.filter)
if !unschedulable {
for _, nodeInfo := range testCtx.nodeInfos {
initialObjects = testCtx.listAll(t)
status := testCtx.p.Filter(testCtx.ctx, testCtx.state, tc.pod, nodeInfo)
nodeName := nodeInfo.Node().Name
t.Run(fmt.Sprintf("filter/%s", nodeInfo.Node().Name), func(t *testing.T) {
testCtx.verify(t, tc.want.filter.forNode(nodeName), initialObjects, nil, status)
})
if status.Code() != framework.Success {
unschedulable = true
} else {
potentialNodes = append(potentialNodes, nodeInfo.Node())
}
}
}
if !unschedulable && len(potentialNodes) > 0 {
initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prescore)
status := testCtx.p.PreScore(testCtx.ctx, testCtx.state, tc.pod, potentialNodes)
t.Run("prescore", func(t *testing.T) {
testCtx.verify(t, tc.want.prescore, initialObjects, nil, status)
})
if status.Code() != framework.Success {
unschedulable = true
}
}
var selectedNode *v1.Node
if !unschedulable && len(potentialNodes) > 0 {
selectedNode = potentialNodes[0]
initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.reserve)
status := testCtx.p.Reserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Name)
t.Run("reserve", func(t *testing.T) {
testCtx.verify(t, tc.want.reserve, initialObjects, nil, status)
})
if status.Code() != framework.Success {
unschedulable = true
}
}
if selectedNode != nil {
if unschedulable {
initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.unreserve)
testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Name)
t.Run("unreserve", func(t *testing.T) {
testCtx.verify(t, tc.want.unreserve, initialObjects, nil, status)
})
} else {
initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postbind)
testCtx.p.PostBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Name)
t.Run("postbind", func(t *testing.T) {
testCtx.verify(t, tc.want.postbind, initialObjects, nil, status)
})
}
} else {
initialObjects = testCtx.listAll(t)
initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postfilter)
result, status := testCtx.p.PostFilter(testCtx.ctx, testCtx.state, tc.pod, nil /* filteredNodeStatusMap not used by plugin */)
t.Run("postfilter", func(t *testing.T) {
assert.Equal(t, tc.want.postFilterResult, result)
testCtx.verify(t, tc.want.postfilter, initialObjects, nil, status)
})
}
})
}
}
type testContext struct {
ctx context.Context
client *fake.Clientset
p *dynamicResources
nodeInfos []*framework.NodeInfo
state *framework.CycleState
}
func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) {
t.Helper()
assert.Equal(t, expected.status, status)
objects := tc.listAll(t)
wantObjects := update(t, initialObjects, expected.changes)
for _, add := range expected.added {
wantObjects = append(wantObjects, add)
}
for _, remove := range expected.removed {
for i, obj := range wantObjects {
// This is a bit relaxed (no GVR comparison, no UID
// comparison) to simplify writing the test cases.
if obj.GetName() == remove.GetName() && obj.GetNamespace() == remove.GetNamespace() {
wantObjects = append(wantObjects[0:i], wantObjects[i+1:]...)
break
}
}
}
sortObjects(wantObjects)
stripObjects(wantObjects)
stripObjects(objects)
assert.Equal(t, wantObjects, objects)
}
// setGVK is implemented by metav1.TypeMeta and thus all API objects, in
// contrast to metav1.Type, which is not (?!) implemented.
type setGVK interface {
SetGroupVersionKind(gvk schema.GroupVersionKind)
}
// stripObjects removes certain fields (Kind, APIVersion, etc.) which are not
// important and might not be set.
func stripObjects(objects []metav1.Object) {
for _, obj := range objects {
obj.SetResourceVersion("")
obj.SetUID("")
if objType, ok := obj.(setGVK); ok {
objType.SetGroupVersionKind(schema.GroupVersionKind{})
}
}
}
func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) {
t.Helper()
claims, err := tc.client.ResourceV1alpha1().ResourceClaims("").List(tc.ctx, metav1.ListOptions{})
require.NoError(t, err, "list claims")
for _, claim := range claims.Items {
objects = append(objects, &claim)
}
schedulings, err := tc.client.ResourceV1alpha1().PodSchedulings("").List(tc.ctx, metav1.ListOptions{})
require.NoError(t, err, "list pod scheduling")
for _, scheduling := range schedulings.Items {
objects = append(objects, &scheduling)
}
sortObjects(objects)
return
}
// updateAPIServer modifies objects and stores any changed object in the API server.
func (tc *testContext) updateAPIServer(t *testing.T, objects []metav1.Object, updates change) []metav1.Object {
modified := update(t, objects, updates)
for i := range modified {
obj := modified[i]
if diff := cmp.Diff(objects[i], obj); diff != "" {
t.Logf("Updating %T %q, diff (-old, +new):\n%s", obj, obj.GetName(), diff)
switch obj := obj.(type) {
case *resourcev1alpha1.ResourceClaim:
obj, err := tc.client.ResourceV1alpha1().ResourceClaims(obj.Namespace).Update(tc.ctx, obj, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error during prepare update: %v", err)
}
modified[i] = obj
case *resourcev1alpha1.PodScheduling:
obj, err := tc.client.ResourceV1alpha1().PodSchedulings(obj.Namespace).Update(tc.ctx, obj, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error during prepare update: %v", err)
}
modified[i] = obj
default:
t.Fatalf("unsupported object type %T", obj)
}
}
}
return modified
}
func sortObjects(objects []metav1.Object) {
sort.Slice(objects, func(i, j int) bool {
if objects[i].GetNamespace() < objects[j].GetNamespace() {
return true
}
return objects[i].GetName() < objects[j].GetName()
})
}
// update walks through all existing objects, finds the corresponding update
// function based on name and kind, and replaces those objects that have an
// update function. The rest is left unchanged.
func update(t *testing.T, objects []metav1.Object, updates change) []metav1.Object {
var updated []metav1.Object
for _, obj := range objects {
switch in := obj.(type) {
case *resourcev1alpha1.ResourceClaim:
if updates.claim != nil {
obj = updates.claim(in)
}
case *resourcev1alpha1.PodScheduling:
if updates.scheduling != nil {
obj = updates.scheduling(in)
}
}
updated = append(updated, obj)
}
return updated
}
func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha1.ResourceClaim, classes []*resourcev1alpha1.ResourceClass, schedulings []*resourcev1alpha1.PodScheduling) (result *testContext) {
t.Helper()
tc := &testContext{}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
tc.ctx = ctx
tc.client = fake.NewSimpleClientset()
reactor := createReactor(tc.client.Tracker())
tc.client.PrependReactor("*", "*", reactor)
informerFactory := informers.NewSharedInformerFactory(tc.client, 0)
opts := []runtime.Option{
runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(informerFactory),
}
fh, err := runtime.NewFramework(nil, nil, tc.ctx.Done(), opts...)
if err != nil {
t.Fatal(err)
}
pl, err := New(nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
if err != nil {
t.Fatal(err)
}
tc.p = pl.(*dynamicResources)
// The tests use the API to create the objects because then reactors
// get triggered.
for _, claim := range claims {
_, err := tc.client.ResourceV1alpha1().ResourceClaims(claim.Namespace).Create(tc.ctx, claim, metav1.CreateOptions{})
require.NoError(t, err, "create resource claim")
}
for _, class := range classes {
_, err := tc.client.ResourceV1alpha1().ResourceClasses().Create(tc.ctx, class, metav1.CreateOptions{})
require.NoError(t, err, "create resource class")
}
for _, scheduling := range schedulings {
_, err := tc.client.ResourceV1alpha1().PodSchedulings(scheduling.Namespace).Create(tc.ctx, scheduling, metav1.CreateOptions{})
require.NoError(t, err, "create pod scheduling")
}
informerFactory.Start(tc.ctx.Done())
t.Cleanup(func() {
// Need to cancel before waiting for the shutdown.
cancel()
// Now we can wait for all goroutines to stop.
informerFactory.Shutdown()
})
informerFactory.WaitForCacheSync(tc.ctx.Done())
for _, node := range nodes {
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(node)
tc.nodeInfos = append(tc.nodeInfos, nodeInfo)
}
tc.state = framework.NewCycleState()
return tc
}
// createReactor implements the logic required for the UID and ResourceVersion
// fields to work when using the fake client. Add it with client.PrependReactor
// to your fake client. ResourceVersion handling is required for conflict
// detection during updates, which is covered by some scenarios.
func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) {
var uidCounter int
var resourceVersionCounter int
var mutex sync.Mutex
return func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) {
createAction, ok := action.(cgotesting.CreateAction)
if !ok {
return false, nil, nil
}
obj, ok := createAction.GetObject().(metav1.Object)
if !ok {
return false, nil, nil
}
mutex.Lock()
defer mutex.Unlock()
switch action.GetVerb() {
case "create":
if obj.GetUID() != "" {
return true, nil, errors.New("UID must not be set on create")
}
if obj.GetResourceVersion() != "" {
return true, nil, errors.New("ResourceVersion must not be set on create")
}
obj.SetUID(types.UID(fmt.Sprintf("UID-%d", uidCounter)))
uidCounter++
obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter))
resourceVersionCounter++
case "update":
uid := obj.GetUID()
resourceVersion := obj.GetResourceVersion()
if uid == "" {
return true, nil, errors.New("UID must be set on update")
}
if resourceVersion == "" {
return true, nil, errors.New("ResourceVersion must be set on update")
}
oldObj, err := tracker.Get(action.GetResource(), obj.GetNamespace(), obj.GetName())
if err != nil {
return true, nil, err
}
oldObjMeta, ok := oldObj.(metav1.Object)
if !ok {
return true, nil, errors.New("internal error: unexpected old object type")
}
if oldObjMeta.GetResourceVersion() != resourceVersion {
return true, nil, errors.New("ResourceVersion must match the object that gets updated")
}
obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter))
resourceVersionCounter++
}
return false, nil, nil
}
}

View File

@ -20,6 +20,7 @@ package feature
// This struct allows us to break the dependency of the plugins on
// the internal k8s features pkg.
type Features struct {
EnableDynamicResourceAllocation bool
EnableReadWriteOncePod bool
EnableVolumeCapacityPriority bool
EnableMinDomainsInPodTopologySpread bool

View File

@ -20,6 +20,7 @@ const (
PrioritySort = "PrioritySort"
DefaultBinder = "DefaultBinder"
DefaultPreemption = "DefaultPreemption"
DynamicResources = "DynamicResources"
ImageLocality = "ImageLocality"
InterPodAffinity = "InterPodAffinity"
NodeAffinity = "NodeAffinity"

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
@ -46,6 +47,7 @@ import (
// through the WithFrameworkOutOfTreeRegistry option.
func NewInTreeRegistry() runtime.Registry {
fts := plfeature.Features{
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
@ -54,7 +56,8 @@ func NewInTreeRegistry() runtime.Registry {
EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness),
}
return runtime.Registry{
registry := runtime.Registry{
dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New),
selectorspread.Name: selectorspread.New,
imagelocality.Name: imagelocality.New,
tainttoleration.Name: tainttoleration.New,
@ -78,4 +81,6 @@ func NewInTreeRegistry() runtime.Registry {
defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New),
schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New),
}
return registry
}

View File

@ -64,6 +64,8 @@ const (
Node GVK = "Node"
PersistentVolume GVK = "PersistentVolume"
PersistentVolumeClaim GVK = "PersistentVolumeClaim"
PodScheduling GVK = "PodScheduling"
ResourceClaim GVK = "ResourceClaim"
StorageClass GVK = "storage.k8s.io/StorageClass"
CSINode GVK = "storage.k8s.io/CSINode"
CSIDriver GVK = "storage.k8s.io/CSIDriver"

View File

@ -20,6 +20,7 @@ import (
"fmt"
v1 "k8s.io/api/core/v1"
resourcev1alpha1 "k8s.io/api/resource/v1alpha1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -249,6 +250,12 @@ func (p *PodWrapper) Containers(containers []v1.Container) *PodWrapper {
return p
}
// PodResourceClaims appends PodResourceClaims into PodSpec of the inner pod.
func (p *PodWrapper) PodResourceClaims(podResourceClaims ...v1.PodResourceClaim) *PodWrapper {
p.Spec.ResourceClaims = append(p.Spec.ResourceClaims, podResourceClaims...)
return p
}
// Priority sets a priority value into PodSpec of the inner pod.
func (p *PodWrapper) Priority(val int32) *PodWrapper {
p.Spec.Priority = &val
@ -791,3 +798,160 @@ func (p *PersistentVolumeWrapper) HostPathVolumeSource(src *v1.HostPathVolumeSou
p.PersistentVolume.Spec.HostPath = src
return p
}
// ResourceClaimWrapper wraps a ResourceClaim inside.
type ResourceClaimWrapper struct{ resourcev1alpha1.ResourceClaim }
// MakeResourceClaim creates a ResourceClaim wrapper.
func MakeResourceClaim() *ResourceClaimWrapper {
return &ResourceClaimWrapper{resourcev1alpha1.ResourceClaim{}}
}
// FromResourceClaim creates a ResourceClaim wrapper from some existing object.
func FromResourceClaim(other *resourcev1alpha1.ResourceClaim) *ResourceClaimWrapper {
return &ResourceClaimWrapper{*other.DeepCopy()}
}
// Obj returns the inner ResourceClaim.
func (wrapper *ResourceClaimWrapper) Obj() *resourcev1alpha1.ResourceClaim {
return &wrapper.ResourceClaim
}
// Name sets `s` as the name of the inner object.
func (wrapper *ResourceClaimWrapper) Name(s string) *ResourceClaimWrapper {
wrapper.SetName(s)
return wrapper
}
// UID sets `s` as the UID of the inner object.
func (wrapper *ResourceClaimWrapper) UID(s string) *ResourceClaimWrapper {
wrapper.SetUID(types.UID(s))
return wrapper
}
// Namespace sets `s` as the namespace of the inner object.
func (wrapper *ResourceClaimWrapper) Namespace(s string) *ResourceClaimWrapper {
wrapper.SetNamespace(s)
return wrapper
}
// OwnerReference updates the owning controller of the object.
func (wrapper *ResourceClaimWrapper) OwnerReference(name, uid string, gvk schema.GroupVersionKind) *ResourceClaimWrapper {
wrapper.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: name,
UID: types.UID(uid),
Controller: pointer.Bool(true),
},
}
return wrapper
}
// AllocationMode sets the allocation mode of the inner object.
func (wrapper *ResourceClaimWrapper) AllocationMode(a resourcev1alpha1.AllocationMode) *ResourceClaimWrapper {
wrapper.ResourceClaim.Spec.AllocationMode = a
return wrapper
}
// ResourceClassName sets the resource class name of the inner object.
func (wrapper *ResourceClaimWrapper) ResourceClassName(name string) *ResourceClaimWrapper {
wrapper.ResourceClaim.Spec.ResourceClassName = name
return wrapper
}
// Allocation sets the allocation of the inner object.
func (wrapper *ResourceClaimWrapper) Allocation(allocation *resourcev1alpha1.AllocationResult) *ResourceClaimWrapper {
wrapper.ResourceClaim.Status.Allocation = allocation
return wrapper
}
// DeallocationRequested sets that field of the inner object.
func (wrapper *ResourceClaimWrapper) DeallocationRequested(deallocationRequested bool) *ResourceClaimWrapper {
wrapper.ResourceClaim.Status.DeallocationRequested = deallocationRequested
return wrapper
}
// ReservedFor sets that field of the inner object.
func (wrapper *ResourceClaimWrapper) ReservedFor(consumers ...resourcev1alpha1.ResourceClaimConsumerReference) *ResourceClaimWrapper {
wrapper.ResourceClaim.Status.ReservedFor = consumers
return wrapper
}
// PodSchedulingWrapper wraps a PodScheduling inside.
type PodSchedulingWrapper struct{ resourcev1alpha1.PodScheduling }
// MakePodScheduling creates a PodScheduling wrapper.
func MakePodScheduling() *PodSchedulingWrapper {
return &PodSchedulingWrapper{resourcev1alpha1.PodScheduling{}}
}
// FromPodScheduling creates a PodScheduling wrapper from some existing object.
func FromPodScheduling(other *resourcev1alpha1.PodScheduling) *PodSchedulingWrapper {
return &PodSchedulingWrapper{*other.DeepCopy()}
}
// Obj returns the inner object.
func (wrapper *PodSchedulingWrapper) Obj() *resourcev1alpha1.PodScheduling {
return &wrapper.PodScheduling
}
// Name sets `s` as the name of the inner object.
func (wrapper *PodSchedulingWrapper) Name(s string) *PodSchedulingWrapper {
wrapper.SetName(s)
return wrapper
}
// UID sets `s` as the UID of the inner object.
func (wrapper *PodSchedulingWrapper) UID(s string) *PodSchedulingWrapper {
wrapper.SetUID(types.UID(s))
return wrapper
}
// Namespace sets `s` as the namespace of the inner object.
func (wrapper *PodSchedulingWrapper) Namespace(s string) *PodSchedulingWrapper {
wrapper.SetNamespace(s)
return wrapper
}
// OwnerReference updates the owning controller of the inner object.
func (wrapper *PodSchedulingWrapper) OwnerReference(name, uid string, gvk schema.GroupVersionKind) *PodSchedulingWrapper {
wrapper.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: name,
UID: types.UID(uid),
Controller: pointer.Bool(true),
},
}
return wrapper
}
// Label applies a {k,v} label pair to the inner object
func (wrapper *PodSchedulingWrapper) Label(k, v string) *PodSchedulingWrapper {
if wrapper.Labels == nil {
wrapper.Labels = make(map[string]string)
}
wrapper.Labels[k] = v
return wrapper
}
// SelectedNode sets that field of the inner object.
func (wrapper *PodSchedulingWrapper) SelectedNode(s string) *PodSchedulingWrapper {
wrapper.Spec.SelectedNode = s
return wrapper
}
// PotentialNodes sets that field of the inner object.
func (wrapper *PodSchedulingWrapper) PotentialNodes(nodes ...string) *PodSchedulingWrapper {
wrapper.Spec.PotentialNodes = nodes
return wrapper
}
// ResourceClaims sets that field of the inner object.
func (wrapper *PodSchedulingWrapper) ResourceClaims(statuses ...resourcev1alpha1.ResourceClaimSchedulingStatus) *PodSchedulingWrapper {
wrapper.Status.ResourceClaims = statuses
return wrapper
}

View File

@ -564,6 +564,15 @@ func ClusterRoles() []rbacv1.ClusterRole {
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csistoragecapacities").RuleOrDie(),
}
// Needed for dynamic resource allocation.
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
kubeSchedulerRules = append(kubeSchedulerRules,
rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceclaims", "resourceclasses").RuleOrDie(),
rbacv1helpers.NewRule(ReadUpdate...).Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(),
rbacv1helpers.NewRule(ReadWrite...).Groups(resourceGroup).Resources("podschedulings").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("podschedulings/status").RuleOrDie(),
)
}
roles = append(roles, rbacv1.ClusterRole{
// a role to use for the kube-scheduler
ObjectMeta: metav1.ObjectMeta{Name: "system:kube-scheduler"},