diff --git a/pkg/scheduler/apis/config/v1/default_plugins.go b/pkg/scheduler/apis/config/v1/default_plugins.go index daeb3370ed0..3fc8c1bdf86 100644 --- a/pkg/scheduler/apis/config/v1/default_plugins.go +++ b/pkg/scheduler/apis/config/v1/default_plugins.go @@ -63,6 +63,23 @@ func applyFeatureGates(config *v1.Plugins) { if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) { config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1.Plugin{Name: names.SchedulingGates}) } + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + // This plugin should come before DefaultPreemption because if + // there is a problem with a Pod and PostFilter gets called to + // resolve the problem, it is better to first deallocate an + // idle ResourceClaim than it is to evict some Pod that might + // be doing useful work. + for i := range config.MultiPoint.Enabled { + if config.MultiPoint.Enabled[i].Name == names.DefaultPreemption { + extended := make([]v1.Plugin, 0, len(config.MultiPoint.Enabled)+1) + extended = append(extended, config.MultiPoint.Enabled[:i]...) + extended = append(extended, v1.Plugin{Name: names.DynamicResources}) + extended = append(extended, config.MultiPoint.Enabled[i:]...) + config.MultiPoint.Enabled = extended + break + } + } + } } // mergePlugins merges the custom set into the given default one, handling disabled sets. diff --git a/pkg/scheduler/apis/config/v1/default_plugins_test.go b/pkg/scheduler/apis/config/v1/default_plugins_test.go index d9ceb2ae527..56008a8254b 100644 --- a/pkg/scheduler/apis/config/v1/default_plugins_test.go +++ b/pkg/scheduler/apis/config/v1/default_plugins_test.go @@ -97,6 +97,39 @@ func TestApplyFeatureGates(t *testing.T) { }, }, }, + { + name: "Feature gate DynamicResourceAllocation enabled", + features: map[featuregate.Feature]bool{ + features.DynamicResourceAllocation: true, + }, + wantConfig: &v1.Plugins{ + MultiPoint: v1.PluginSet{ + Enabled: []v1.Plugin{ + {Name: names.PrioritySort}, + {Name: names.NodeUnschedulable}, + {Name: names.NodeName}, + {Name: names.TaintToleration, Weight: pointer.Int32(3)}, + {Name: names.NodeAffinity, Weight: pointer.Int32(2)}, + {Name: names.NodePorts}, + {Name: names.NodeResourcesFit, Weight: pointer.Int32(1)}, + {Name: names.VolumeRestrictions}, + {Name: names.EBSLimits}, + {Name: names.GCEPDLimits}, + {Name: names.NodeVolumeLimits}, + {Name: names.AzureDiskLimits}, + {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, + {Name: names.PodTopologySpread, Weight: pointer.Int32(2)}, + {Name: names.InterPodAffinity, Weight: pointer.Int32(2)}, + {Name: names.DynamicResources}, + {Name: names.DefaultPreemption}, + {Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)}, + {Name: names.ImageLocality, Weight: pointer.Int32(1)}, + {Name: names.DefaultBinder}, + }, + }, + }, + }, } for _, test := range tests { diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 0c39dcde277..9f7b2b53d7d 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -25,12 +25,14 @@ import ( storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" corev1helpers "k8s.io/component-helpers/scheduling/corev1" corev1nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" @@ -376,6 +378,18 @@ func addAllEventHandlers( informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler( buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"), ) + case framework.PodScheduling: + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + _, _ = informerFactory.Resource().V1alpha1().PodSchedulings().Informer().AddEventHandler( + buildEvtResHandler(at, framework.PodScheduling, "PodScheduling"), + ) + } + case framework.ResourceClaim: + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + _, _ = informerFactory.Resource().V1alpha1().ResourceClaims().Informer().AddEventHandler( + buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"), + ) + } case framework.StorageClass: if at&framework.Add != 0 { informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( diff --git a/pkg/scheduler/framework/plugins/dynamicresources/OWNERS b/pkg/scheduler/framework/plugins/dynamicresources/OWNERS new file mode 100644 index 00000000000..98ccc3b8054 --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/OWNERS @@ -0,0 +1,8 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +reviewers: + - klueska + - pohly + - bart0sh +labels: + - sig/node diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go new file mode 100644 index 00000000000..7d3c9f9f97a --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -0,0 +1,804 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "context" + "errors" + "fmt" + "sort" + "sync" + + v1 "k8s.io/api/core/v1" + resourcev1alpha1 "k8s.io/api/resource/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + resourcev1alpha1listers "k8s.io/client-go/listers/resource/v1alpha1" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" + "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/dynamic-resource-allocation/resourceclaim" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" +) + +const ( + // Name is the name of the plugin used in Registry and configurations. + Name = names.DynamicResources + + stateKey framework.StateKey = Name +) + +// The state is initialized in PreFilter phase. Because we save the pointer in +// framework.CycleState, in the later phases we don't need to call Write method +// to update the value +type stateData struct { + // A copy of all claims for the Pod (i.e. 1:1 match with + // pod.Spec.ResourceClaims), initially with the status from the start + // of the scheduling cycle. Each claim instance is read-only because it + // might come from the informer cache. The instances get replaced when + // the plugin itself successfully does an Update. + // + // Empty if the Pod has no claims. + claims []*resourcev1alpha1.ResourceClaim + + // The AvailableOnNodes node filters of the claims converted from the + // v1 API to nodeaffinity.NodeSelector by PreFilter for repeated + // evaluation in Filter. Nil for claims which don't have it. + availableOnNodes []*nodeaffinity.NodeSelector + + // The indices of all claims that: + // - are allocated + // - use delayed allocation + // - were not available on at least one node + // + // Set in parallel during Filter, so write access there must be + // protected by the mutex. Used by PostFilter. + unavailableClaims sets.Int + + // A pointer to the PodScheduling object for the pod, if one exists. + // Gets set on demand. + // + // Conceptually, this object belongs into the scheduler framework + // where it might get shared by different plugins. But in practice, + // it is currently only used by dynamic provisioning and thus + // managed entirely here. + podScheduling *resourcev1alpha1.PodScheduling + + // podSchedulingDirty is true if the current copy was locally modified. + podSchedulingDirty bool + + mutex sync.Mutex +} + +func (d *stateData) Clone() framework.StateData { + return d +} + +func (d *stateData) updateClaimStatus(ctx context.Context, clientset kubernetes.Interface, index int, claim *resourcev1alpha1.ResourceClaim) error { + // TODO (#113700): replace with patch operation. Beware that patching must only succeed if the + // object has not been modified in parallel by someone else. + claim, err := clientset.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + // TODO: metric for update results, with the operation ("set selected + // node", "set PotentialNodes", etc.) as one dimension. + if err != nil { + return fmt.Errorf("update resource claim: %w", err) + } + + // Remember the new instance. This is relevant when the plugin must + // update the same claim multiple times (for example, first reserve + // the claim, then later remove the reservation), because otherwise the second + // update would fail with a "was modified" error. + d.claims[index] = claim + + return nil +} + +// initializePodScheduling can be called concurrently. It returns an existing PodScheduling +// object if there is one already, retrieves one if not, or as a last resort creates +// one from scratch. +func (d *stateData) initializePodScheduling(ctx context.Context, pod *v1.Pod, podSchedulingLister resourcev1alpha1listers.PodSchedulingLister) (*resourcev1alpha1.PodScheduling, error) { + // TODO (#113701): check if this mutex locking can be avoided by calling initializePodScheduling during PreFilter. + d.mutex.Lock() + defer d.mutex.Unlock() + + if d.podScheduling != nil { + return d.podScheduling, nil + } + + podScheduling, err := podSchedulingLister.PodSchedulings(pod.Namespace).Get(pod.Name) + switch { + case apierrors.IsNotFound(err): + controller := true + podScheduling = &resourcev1alpha1.PodScheduling{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: pod.Name, + UID: pod.UID, + Controller: &controller, + }, + }, + }, + } + err = nil + case err != nil: + return nil, err + default: + // We have an object, but it might be obsolete. + if !metav1.IsControlledBy(podScheduling, pod) { + return nil, fmt.Errorf("PodScheduling object with UID %s is not owned by Pod %s/%s", podScheduling.UID, pod.Namespace, pod.Name) + } + } + d.podScheduling = podScheduling + return podScheduling, err +} + +// publishPodScheduling creates or updates the PodScheduling object. +func (d *stateData) publishPodScheduling(ctx context.Context, clientset kubernetes.Interface, podScheduling *resourcev1alpha1.PodScheduling) error { + d.mutex.Lock() + defer d.mutex.Unlock() + + var err error + logger := klog.FromContext(ctx) + msg := "Updating PodScheduling" + if podScheduling.UID == "" { + msg = "Creating PodScheduling" + } + if loggerV := logger.V(6); loggerV.Enabled() { + // At a high enough log level, dump the entire object. + loggerV.Info(msg, "podschedulingDump", podScheduling) + } else { + logger.V(5).Info(msg, "podscheduling", klog.KObj(podScheduling)) + } + if podScheduling.UID == "" { + podScheduling, err = clientset.ResourceV1alpha1().PodSchedulings(podScheduling.Namespace).Create(ctx, podScheduling, metav1.CreateOptions{}) + } else { + // TODO (#113700): patch here to avoid racing with drivers which update the status. + podScheduling, err = clientset.ResourceV1alpha1().PodSchedulings(podScheduling.Namespace).Update(ctx, podScheduling, metav1.UpdateOptions{}) + } + if err != nil { + return err + } + d.podScheduling = podScheduling + d.podSchedulingDirty = false + return nil +} + +// storePodScheduling replaces the pod scheduling object in the state. +func (d *stateData) storePodScheduling(podScheduling *resourcev1alpha1.PodScheduling) { + d.mutex.Lock() + defer d.mutex.Unlock() + + d.podScheduling = podScheduling + d.podSchedulingDirty = true +} + +func statusForClaim(podScheduling *resourcev1alpha1.PodScheduling, podClaimName string) *resourcev1alpha1.ResourceClaimSchedulingStatus { + for _, status := range podScheduling.Status.ResourceClaims { + if status.Name == podClaimName { + return &status + } + } + return nil +} + +// dynamicResources is a plugin that ensures that ResourceClaims are allocated. +type dynamicResources struct { + enabled bool + clientset kubernetes.Interface + claimLister resourcev1alpha1listers.ResourceClaimLister + classLister resourcev1alpha1listers.ResourceClassLister + podSchedulingLister resourcev1alpha1listers.PodSchedulingLister +} + +// New initializes a new plugin and returns it. +func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { + if !fts.EnableDynamicResourceAllocation { + // Disabled, won't do anything. + return &dynamicResources{}, nil + } + + return &dynamicResources{ + enabled: true, + clientset: fh.ClientSet(), + claimLister: fh.SharedInformerFactory().Resource().V1alpha1().ResourceClaims().Lister(), + classLister: fh.SharedInformerFactory().Resource().V1alpha1().ResourceClasses().Lister(), + podSchedulingLister: fh.SharedInformerFactory().Resource().V1alpha1().PodSchedulings().Lister(), + }, nil +} + +var _ framework.PreFilterPlugin = &dynamicResources{} +var _ framework.FilterPlugin = &dynamicResources{} +var _ framework.PostFilterPlugin = &dynamicResources{} +var _ framework.PreScorePlugin = &dynamicResources{} +var _ framework.ReservePlugin = &dynamicResources{} +var _ framework.EnqueueExtensions = &dynamicResources{} +var _ framework.PostBindPlugin = &dynamicResources{} + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *dynamicResources) Name() string { + return Name +} + +// EventsToRegister returns the possible events that may make a Pod +// failed by this plugin schedulable. +func (pl *dynamicResources) EventsToRegister() []framework.ClusterEvent { + if !pl.enabled { + return nil + } + + events := []framework.ClusterEvent{ + // Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable. + {Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, + // When a driver has provided additional information, a pod waiting for that information + // may be schedulable. + // TODO (#113702): can we change this so that such an event does not trigger *all* pods? + // Yes: https://github.com/kubernetes/kubernetes/blob/abcbaed0784baf5ed2382aae9705a8918f2daa18/pkg/scheduler/eventhandlers.go#L70 + {Resource: framework.PodScheduling, ActionType: framework.Add | framework.Update}, + // A resource might depend on node labels for topology filtering. + // A new or updated node may make pods schedulable. + {Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}, + } + return events +} + +// podResourceClaims returns the ResourceClaims for all pod.Spec.PodResourceClaims. +func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha1.ResourceClaim, error) { + claims := make([]*resourcev1alpha1.ResourceClaim, 0, len(pod.Spec.ResourceClaims)) + for _, resource := range pod.Spec.ResourceClaims { + claimName := resourceclaim.Name(pod, &resource) + isEphemeral := resource.Source.ResourceClaimTemplateName != nil + claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(claimName) + if err != nil { + // The error usually has already enough context ("resourcevolumeclaim "myclaim" not found"), + // but we can do better for generic ephemeral inline volumes where that situation + // is normal directly after creating a pod. + if isEphemeral && apierrors.IsNotFound(err) { + err = fmt.Errorf("waiting for dynamic resource controller to create the resourceclaim %q", claimName) + } + return nil, err + } + + if claim.DeletionTimestamp != nil { + return nil, fmt.Errorf("resourceclaim %q is being deleted", claim.Name) + } + + if isEphemeral { + if err := resourceclaim.IsForPod(pod, claim); err != nil { + return nil, err + } + } + // We store the pointer as returned by the lister. The + // assumption is that if a claim gets modified while our code + // runs, the cache will store a new pointer, not mutate the + // existing object that we point to here. + claims = append(claims, claim) + } + return claims, nil +} + +// PreFilter invoked at the prefilter extension point to check if pod has all +// immediate claims bound. UnschedulableAndUnresolvable is returned if +// the pod cannot be scheduled at the moment on any node. +func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + if !pl.enabled { + return nil, nil + } + logger := klog.FromContext(ctx) + + // If the pod does not reference any claim, we don't need to do + // anything for it. We just initialize an empty state to record that + // observation for the other functions. This gets updated below + // if we get that far. + s := &stateData{} + state.Write(stateKey, s) + + claims, err := pl.podResourceClaims(pod) + if err != nil { + return nil, statusUnschedulable(logger, err.Error()) + } + logger.V(5).Info("pod resource claims", "pod", klog.KObj(pod), "resourceclaims", klog.KObjs(claims)) + // If the pod does not reference any claim, we don't need to do + // anything for it. + if len(claims) == 0 { + return nil, nil + } + + s.availableOnNodes = make([]*nodeaffinity.NodeSelector, len(claims)) + for index, claim := range claims { + if claim.Spec.AllocationMode == resourcev1alpha1.AllocationModeImmediate && + claim.Status.Allocation == nil { + // This will get resolved by the resource driver. + return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) + } + if claim.Status.DeallocationRequested { + // This will get resolved by the resource driver. + return nil, statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) + } + if claim.Status.Allocation != nil && + !resourceclaim.CanBeReserved(claim) && + !resourceclaim.IsReservedForPod(pod, claim) { + // Resource is in use. The pod has to wait. + return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) + } + if claim.Status.Allocation != nil && + claim.Status.Allocation.AvailableOnNodes != nil { + nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes) + if err != nil { + return nil, statusError(logger, err) + } + s.availableOnNodes[index] = nodeSelector + } + } + + s.claims = claims + state.Write(stateKey, s) + return nil, nil +} + +// PreFilterExtensions returns prefilter extensions, pod add and remove. +func (pl *dynamicResources) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +func getStateData(cs *framework.CycleState) (*stateData, error) { + state, err := cs.Read(stateKey) + if err != nil { + return nil, err + } + s, ok := state.(*stateData) + if !ok { + return nil, errors.New("unable to convert state into stateData") + } + return s, nil +} + +// Filter invoked at the filter extension point. +// It evaluates if a pod can fit due to the resources it requests, +// for both allocated and unallocated claims. +// +// For claims that are bound, then it checks that the node affinity is +// satisfied by the given node. +// +// For claims that are unbound, it checks whether the claim might get allocated +// for the node. +func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + if !pl.enabled { + return nil + } + state, err := getStateData(cs) + if err != nil { + return statusError(klog.FromContext(ctx), err) + } + if len(state.claims) == 0 { + return nil + } + + logger := klog.FromContext(ctx) + node := nodeInfo.Node() + + var unavailableClaims []int + for index, claim := range state.claims { + logger.V(10).Info("filtering based on resource claims of the pod", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) + switch { + case claim.Status.Allocation != nil: + if nodeSelector := state.availableOnNodes[index]; nodeSelector != nil { + if !nodeSelector.Match(node) { + logger.V(5).Info("AvailableOnNodes does not match", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) + unavailableClaims = append(unavailableClaims, index) + } + } + case claim.Status.DeallocationRequested: + // We shouldn't get here. PreFilter already checked this. + return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim)) + case claim.Spec.AllocationMode == resourcev1alpha1.AllocationModeWaitForFirstConsumer: + // The ResourceClass might have a node filter. This is + // useful for trimming the initial set of potential + // nodes before we ask the driver(s) for information + // about the specific pod. + class, err := pl.classLister.Get(claim.Spec.ResourceClassName) + if err != nil { + // If the class does not exist, then allocation cannot proceed. + return statusError(logger, fmt.Errorf("look up resource class: %v", err)) + } + if class.SuitableNodes != nil { + // TODO (#113700): parse class.SuitableNodes once in PreFilter, reuse result. + matches, err := corev1helpers.MatchNodeSelectorTerms(node, class.SuitableNodes) + if err != nil { + return statusError(logger, fmt.Errorf("potential node filter: %v", err)) + } + if !matches { + return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclass", klog.KObj(class)) + } + } + + // Now we need information from drivers. + podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister) + if err != nil { + return statusError(logger, err) + } + status := statusForClaim(podScheduling, pod.Spec.ResourceClaims[index].Name) + if status != nil { + for _, unsuitableNode := range status.UnsuitableNodes { + if node.Name == unsuitableNode { + return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes) + } + } + } + default: + // This should have been delayed allocation. Immediate + // allocation was already checked for in PreFilter. + return statusError(logger, fmt.Errorf("internal error, unexpected allocation mode %v", claim.Spec.AllocationMode)) + } + } + + if len(unavailableClaims) > 0 { + state.mutex.Lock() + defer state.mutex.Unlock() + if state.unavailableClaims == nil { + state.unavailableClaims = sets.NewInt() + } + + for index := range unavailableClaims { + claim := state.claims[index] + // Deallocation makes more sense for claims with + // delayed allocation. Claims with immediate allocation + // would just get allocated again for a random node, + // which is unlikely to help the pod. + if claim.Spec.AllocationMode == resourcev1alpha1.AllocationModeWaitForFirstConsumer { + state.unavailableClaims.Insert(unavailableClaims...) + } + } + return statusUnschedulable(logger, "resourceclaim not available on the node", "pod", klog.KObj(pod)) + } + + return nil +} + +// PostFilter checks whether there are allocated claims that could get +// deallocated to help get the Pod schedulable. If yes, it picks one and +// requests its deallocation. This only gets called when filtering found no +// suitable node. +func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + if !pl.enabled { + return nil, framework.NewStatus(framework.Unschedulable, "plugin disabled") + } + logger := klog.FromContext(ctx) + state, err := getStateData(cs) + if err != nil { + return nil, statusError(logger, err) + } + if len(state.claims) == 0 { + return nil, framework.NewStatus(framework.Unschedulable, "no new claims to deallocate") + } + + // Iterating over a map is random. This is intentional here, we want to + // pick one claim randomly because there is no better heuristic. + for index := range state.unavailableClaims { + claim := state.claims[index] + if len(claim.Status.ReservedFor) == 0 || + len(claim.Status.ReservedFor) == 1 && claim.Status.ReservedFor[0].UID == pod.UID { + claim := state.claims[index].DeepCopy() + claim.Status.DeallocationRequested = true + claim.Status.ReservedFor = nil + logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim)) + if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil { + return nil, statusError(logger, err) + } + return nil, nil + } + } + return nil, framework.NewStatus(framework.Unschedulable, "still not schedulable") +} + +// PreScore is passed a list of all nodes that would fit the pod. Not all +// claims are necessarily allocated yet, so here we can set the SuitableNodes +// field for those which are pending. +func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { + if !pl.enabled { + return nil + } + state, err := getStateData(cs) + if err != nil { + return statusError(klog.FromContext(ctx), err) + } + if len(state.claims) == 0 { + return nil + } + + logger := klog.FromContext(ctx) + podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister) + if err != nil { + return statusError(logger, err) + } + pending := false + for _, claim := range state.claims { + if claim.Status.Allocation == nil { + pending = true + } + } + if pending && !haveAllNodes(podScheduling.Spec.PotentialNodes, nodes) { + // Remember the potential nodes. The object will get created or + // updated in Reserve. This is both an optimization and + // covers the case that PreScore doesn't get called when there + // is only a single node. + logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes)) + podScheduling = podScheduling.DeepCopy() + numNodes := len(nodes) + if numNodes > resourcev1alpha1.PodSchedulingNodeListMaxSize { + numNodes = resourcev1alpha1.PodSchedulingNodeListMaxSize + } + podScheduling.Spec.PotentialNodes = make([]string, 0, numNodes) + if numNodes == len(nodes) { + // Copy all node names. + for _, node := range nodes { + podScheduling.Spec.PotentialNodes = append(podScheduling.Spec.PotentialNodes, node.Name) + } + } else { + // Select a random subset of the nodes to comply with + // the PotentialNodes length limit. Randomization is + // done for us by Go which iterates over map entries + // randomly. + nodeNames := map[string]struct{}{} + for _, node := range nodes { + nodeNames[node.Name] = struct{}{} + } + for nodeName := range nodeNames { + if len(podScheduling.Spec.PotentialNodes) >= resourcev1alpha1.PodSchedulingNodeListMaxSize { + break + } + podScheduling.Spec.PotentialNodes = append(podScheduling.Spec.PotentialNodes, nodeName) + } + } + sort.Strings(podScheduling.Spec.PotentialNodes) + state.storePodScheduling(podScheduling) + } + logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", nodes) + return nil +} + +func haveAllNodes(nodeNames []string, nodes []*v1.Node) bool { + for _, node := range nodes { + if !haveNode(nodeNames, node.Name) { + return false + } + } + return true +} + +func haveNode(nodeNames []string, nodeName string) bool { + for _, n := range nodeNames { + if n == nodeName { + return true + } + } + return false +} + +// Reserve reserves claims for the pod. +func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { + if !pl.enabled { + return nil + } + state, err := getStateData(cs) + if err != nil { + return statusError(klog.FromContext(ctx), err) + } + if len(state.claims) == 0 { + return nil + } + + numDelayedAllocationPending := 0 + numClaimsWithStatusInfo := 0 + logger := klog.FromContext(ctx) + podScheduling, err := state.initializePodScheduling(ctx, pod, pl.podSchedulingLister) + if err != nil { + return statusError(logger, err) + } + for index, claim := range state.claims { + if claim.Status.Allocation != nil { + // Allocated, but perhaps not reserved yet. + if resourceclaim.IsReservedForPod(pod, claim) { + logger.V(5).Info("is reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) + continue + } + claim := claim.DeepCopy() + claim.Status.ReservedFor = append(claim.Status.ReservedFor, + resourcev1alpha1.ResourceClaimConsumerReference{ + Resource: "pods", + Name: pod.Name, + UID: pod.UID, + }) + logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim)) + _, err := pl.clientset.ResourceV1alpha1().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + // TODO: metric for update errors. + if err != nil { + return statusError(logger, err) + } + // If we get here, we know that reserving the claim for + // the pod worked and we can proceed with scheduling + // it. + } else { + // Must be delayed allocation. + numDelayedAllocationPending++ + + // Did the driver provide information that steered node + // selection towards a node that it can support? + if statusForClaim(podScheduling, pod.Spec.ResourceClaims[index].Name) != nil { + numClaimsWithStatusInfo++ + } + } + } + + if numDelayedAllocationPending == 0 { + // Nothing left to do. + return nil + } + + podSchedulingDirty := state.podSchedulingDirty + if len(podScheduling.Spec.PotentialNodes) == 0 { + // PreScore was not called, probably because there was + // only one candidate. We need to ask whether that + // node is suitable, otherwise the scheduler will pick + // it forever even when it cannot satisfy the claim. + podScheduling = podScheduling.DeepCopy() + podScheduling.Spec.PotentialNodes = []string{nodeName} + logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + podSchedulingDirty = true + } + + // When there is only one pending resource, we can go ahead with + // requesting allocation even when we don't have the information from + // the driver yet. Otherwise we wait for information before blindly + // making a decision that might have to be reversed later. + if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending { + podScheduling = podScheduling.DeepCopy() + // TODO: can we increase the chance that the scheduler picks + // the same node as before when allocation is on-going, + // assuming that that node still fits the pod? Picking a + // different node may lead to some claims being allocated for + // one node and others for another, which then would have to be + // resolved with deallocation. + podScheduling.Spec.SelectedNode = nodeName + logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + if err := state.publishPodScheduling(ctx, pl.clientset, podScheduling); err != nil { + return statusError(logger, err) + } + return statusUnschedulable(logger, "waiting for resource driver to allocate resource", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}) + } + + // May have been modified earlier in PreScore or above. + if podSchedulingDirty { + if err := state.publishPodScheduling(ctx, pl.clientset, podScheduling); err != nil { + return statusError(logger, err) + } + } + + // More than one pending claim and not enough information about all of them. + // + // TODO: can or should we ensure that scheduling gets aborted while + // waiting for resources *before* triggering delayed volume + // provisioning? On the one hand, volume provisioning is currently + // irreversible, so it better should come last. On the other hand, + // triggering both in parallel might be faster. + return statusUnschedulable(logger, "waiting for resource driver to provide information", "pod", klog.KObj(pod)) +} + +// Unreserve clears the ReservedFor field for all claims. +// It's idempotent, and does nothing if no state found for the given pod. +func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) { + if !pl.enabled { + return + } + state, err := getStateData(cs) + if err != nil { + return + } + if len(state.claims) == 0 { + return + } + + logger := klog.FromContext(ctx) + for index, claim := range state.claims { + if claim.Status.Allocation != nil && + resourceclaim.IsReservedForPod(pod, claim) { + // Remove pod from ReservedFor. + claim := claim.DeepCopy() + reservedFor := make([]resourcev1alpha1.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor)-1) + for _, reserved := range claim.Status.ReservedFor { + // TODO: can UID be assumed to be unique all resources or do we also need to compare Group/Version/Resource? + if reserved.UID != pod.UID { + reservedFor = append(reservedFor, reserved) + } + } + claim.Status.ReservedFor = reservedFor + logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim)) + if err := state.updateClaimStatus(ctx, pl.clientset, index, claim); err != nil { + // We will get here again when pod scheduling + // is retried. + logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim)) + } + } + } +} + +// PostBind is called after a pod is successfully bound to a node. Now we are +// sure that a PodScheduling object, if it exists, is definitely not going to +// be needed anymore and can delete it. This is a one-shot thing, there won't +// be any retries. This is okay because it should usually work and in those +// cases where it doesn't, the garbage collector will eventually clean up. +func (pl *dynamicResources) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) { + if !pl.enabled { + return + } + state, err := getStateData(cs) + if err != nil { + return + } + if len(state.claims) == 0 { + return + } + + // We cannot know for sure whether the PodScheduling object exists. We + // might have created it in the previous pod scheduling cycle and not + // have it in our informer cache yet. Let's try to delete, just to be + // on the safe side. + logger := klog.FromContext(ctx) + err = pl.clientset.ResourceV1alpha1().PodSchedulings(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + switch { + case apierrors.IsNotFound(err): + logger.V(5).Info("no PodScheduling object to delete") + case err != nil: + logger.Error(err, "delete PodScheduling") + default: + logger.V(5).Info("PodScheduling object deleted") + } +} + +// statusUnschedulable ensures that there is a log message associated with the +// line where the status originated. +func statusUnschedulable(logger klog.Logger, reason string, kv ...interface{}) *framework.Status { + if loggerV := logger.V(5); loggerV.Enabled() { + helper, loggerV := loggerV.WithCallStackHelper() + helper() + kv = append(kv, "reason", reason) + // nolint: logcheck // warns because it cannot check key/values + loggerV.Info("pod unschedulable", kv...) + } + return framework.NewStatus(framework.UnschedulableAndUnresolvable, reason) +} + +// statusError ensures that there is a log message associated with the +// line where the error originated. +func statusError(logger klog.Logger, err error, kv ...interface{}) *framework.Status { + if loggerV := logger.V(5); loggerV.Enabled() { + helper, loggerV := loggerV.WithCallStackHelper() + helper() + // nolint: logcheck // warns because it cannot check key/values + loggerV.Error(err, "dynamic resource plugin failed", kv...) + } + return framework.AsStatus(err) +} diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go new file mode 100644 index 00000000000..2d135e8bf82 --- /dev/null +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -0,0 +1,789 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamicresources + +import ( + "context" + "errors" + "fmt" + "sort" + "sync" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + v1 "k8s.io/api/core/v1" + resourcev1alpha1 "k8s.io/api/resource/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + cgotesting "k8s.io/client-go/testing" + "k8s.io/klog/v2/ktesting" + _ "k8s.io/klog/v2/ktesting/init" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + st "k8s.io/kubernetes/pkg/scheduler/testing" +) + +var ( + podKind = v1.SchemeGroupVersion.WithKind("Pod") + + podName = "my-pod" + podUID = "1234" + resourceName = "my-resource" + resourceName2 = resourceName + "-2" + claimName = podName + "-" + resourceName + claimName2 = podName + "-" + resourceName + "-2" + className = "my-resource-class" + namespace = "default" + + resourceClass = &resourcev1alpha1.ResourceClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: className, + }, + DriverName: "some-driver", + } + + podWithClaimName = st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}). + Obj() + otherPodWithClaimName = st.MakePod().Name(podName).Namespace(namespace). + UID(podUID + "-II"). + PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}). + Obj() + podWithClaimTemplate = st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimTemplateName: &claimName}}). + Obj() + podWithTwoClaimNames = st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}). + PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, Source: v1.ClaimSource{ResourceClaimName: &claimName2}}). + Obj() + + workerNode = &st.MakeNode().Name("worker").Label("nodename", "worker").Node + + claim = st.MakeResourceClaim(). + Name(claimName). + Namespace(namespace). + ResourceClassName(className). + Obj() + pendingImmediateClaim = st.FromResourceClaim(claim). + AllocationMode(resourcev1alpha1.AllocationModeImmediate). + Obj() + pendingDelayedClaim = st.FromResourceClaim(claim). + AllocationMode(resourcev1alpha1.AllocationModeWaitForFirstConsumer). + Obj() + pendingDelayedClaim2 = st.FromResourceClaim(pendingDelayedClaim). + Name(claimName2). + Obj() + deallocatingClaim = st.FromResourceClaim(pendingImmediateClaim). + Allocation(&resourcev1alpha1.AllocationResult{}). + DeallocationRequested(true). + Obj() + inUseClaim = st.FromResourceClaim(pendingImmediateClaim). + Allocation(&resourcev1alpha1.AllocationResult{}). + ReservedFor(resourcev1alpha1.ResourceClaimConsumerReference{UID: types.UID(podUID)}). + Obj() + allocatedClaim = st.FromResourceClaim(pendingDelayedClaim). + OwnerReference(podName, podUID, podKind). + Allocation(&resourcev1alpha1.AllocationResult{}). + Obj() + allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim). + Allocation(&resourcev1alpha1.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("no-such-label", []string{"no-such-value"}).Obj()}). + Obj() + allocatedImmediateClaimWithWrongTopology = st.FromResourceClaim(allocatedDelayedClaimWithWrongTopology). + AllocationMode(resourcev1alpha1.AllocationModeImmediate). + Obj() + allocatedClaimWithGoodTopology = st.FromResourceClaim(allocatedClaim). + Allocation(&resourcev1alpha1.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("nodename", []string{"worker"}).Obj()}). + Obj() + otherClaim = st.MakeResourceClaim(). + Name("not-my-claim"). + Namespace(namespace). + ResourceClassName(className). + Obj() + + scheduling = st.MakePodScheduling().Name(podName).Namespace(namespace). + OwnerReference(podName, podUID, podKind). + Obj() + schedulingPotential = st.FromPodScheduling(scheduling). + PotentialNodes(workerNode.Name). + Obj() + schedulingSelectedPotential = st.FromPodScheduling(schedulingPotential). + SelectedNode(workerNode.Name). + Obj() + schedulingInfo = st.FromPodScheduling(schedulingPotential). + ResourceClaims(resourcev1alpha1.ResourceClaimSchedulingStatus{Name: resourceName}, + resourcev1alpha1.ResourceClaimSchedulingStatus{Name: resourceName2}). + Obj() +) + +// result defines the expected outcome of some operation. It covers +// operation's status and the state of the world (= objects). +type result struct { + status *framework.Status + // changes contains a mapping of name to an update function for + // the corresponding object. These functions apply exactly the expected + // changes to a copy of the object as it existed before the operation. + changes change + + // added contains objects created by the operation. + added []metav1.Object + + // removed contains objects deleted by the operation. + removed []metav1.Object +} + +// change contains functions for modifying objects of a certain type. These +// functions will get called for all objects of that type. If they needs to +// make changes only to a particular instance, then it must check the name. +type change struct { + scheduling func(*resourcev1alpha1.PodScheduling) *resourcev1alpha1.PodScheduling + claim func(*resourcev1alpha1.ResourceClaim) *resourcev1alpha1.ResourceClaim +} +type perNodeResult map[string]result + +func (p perNodeResult) forNode(nodeName string) result { + if p == nil { + return result{} + } + return p[nodeName] +} + +type want struct { + preFilterResult *framework.PreFilterResult + prefilter result + filter perNodeResult + prescore result + reserve result + unreserve result + postbind result + postFilterResult *framework.PostFilterResult + postfilter result +} + +// prepare contains changes for objects in the API server. +// Those changes are applied before running the steps. This can +// be used to simulate concurrent changes by some other entities +// like a resource driver. +type prepare struct { + filter change + prescore change + reserve change + unreserve change + postbind change + postfilter change +} + +func TestPlugin(t *testing.T) { + testcases := map[string]struct { + nodes []*v1.Node // default if unset is workerNode + pod *v1.Pod + claims []*resourcev1alpha1.ResourceClaim + classes []*resourcev1alpha1.ResourceClass + schedulings []*resourcev1alpha1.PodScheduling + + prepare prepare + want want + }{ + "empty": { + pod: st.MakePod().Name("foo").Namespace("default").Obj(), + }, + "claim-reference": { + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{allocatedClaim, otherClaim}, + }, + "claim-template": { + pod: podWithClaimTemplate, + claims: []*resourcev1alpha1.ResourceClaim{allocatedClaim, otherClaim}, + }, + "missing-claim": { + pod: podWithClaimTemplate, + want: want{ + prefilter: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `waiting for dynamic resource controller to create the resourceclaim "my-pod-my-resource"`), + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + }, + "waiting-for-immediate-allocation": { + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{pendingImmediateClaim}, + want: want{ + prefilter: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `unallocated immediate resourceclaim`), + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + }, + "waiting-for-deallocation": { + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{deallocatingClaim}, + want: want{ + prefilter: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim must be reallocated`), + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + }, + "delayed-allocation-missing-class": { + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim}, + want: want{ + filter: perNodeResult{ + workerNode.Name: { + status: framework.AsStatus(fmt.Errorf(`look up resource class: resourceclass.resource.k8s.io "%s" not found`, className)), + }, + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `still not schedulable`), + }, + }, + }, + "delayed-allocation-scheduling-select-immediately": { + // Create the PodScheduling object, ask for information + // and select a node. + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim}, + classes: []*resourcev1alpha1.ResourceClass{resourceClass}, + want: want{ + reserve: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `waiting for resource driver to allocate resource`), + added: []metav1.Object{schedulingSelectedPotential}, + }, + }, + }, + "delayed-allocation-scheduling-ask": { + // Create the PodScheduling object, ask for + // information, but do not select a node because + // there are multiple claims. + pod: podWithTwoClaimNames, + claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim, pendingDelayedClaim2}, + classes: []*resourcev1alpha1.ResourceClass{resourceClass}, + want: want{ + reserve: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `waiting for resource driver to provide information`), + added: []metav1.Object{schedulingPotential}, + }, + }, + }, + "delayed-allocation-scheduling-finish": { + // Use the populated PodScheduling object to select a + // node. + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim}, + schedulings: []*resourcev1alpha1.PodScheduling{schedulingInfo}, + classes: []*resourcev1alpha1.ResourceClass{resourceClass}, + want: want{ + reserve: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `waiting for resource driver to allocate resource`), + changes: change{ + scheduling: func(in *resourcev1alpha1.PodScheduling) *resourcev1alpha1.PodScheduling { + return st.FromPodScheduling(in). + SelectedNode(workerNode.Name). + Obj() + }, + }, + }, + }, + }, + "delayed-allocation-scheduling-finish-concurrent-label-update": { + // Use the populated PodScheduling object to select a + // node. + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{pendingDelayedClaim}, + schedulings: []*resourcev1alpha1.PodScheduling{schedulingInfo}, + classes: []*resourcev1alpha1.ResourceClass{resourceClass}, + prepare: prepare{ + reserve: change{ + scheduling: func(in *resourcev1alpha1.PodScheduling) *resourcev1alpha1.PodScheduling { + // This does not actually conflict with setting the + // selected node, but because the plugin is not using + // patching yet, Update nonetheless fails. + return st.FromPodScheduling(in). + Label("hello", "world"). + Obj() + }, + }, + }, + want: want{ + reserve: result{ + status: framework.AsStatus(errors.New(`ResourceVersion must match the object that gets updated`)), + }, + }, + }, + "delayed-allocation-scheduling-completed": { + // Remove PodScheduling object once the pod is scheduled. + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{allocatedClaim}, + schedulings: []*resourcev1alpha1.PodScheduling{schedulingInfo}, + classes: []*resourcev1alpha1.ResourceClass{resourceClass}, + want: want{ + reserve: result{ + changes: change{ + claim: func(in *resourcev1alpha1.ResourceClaim) *resourcev1alpha1.ResourceClaim { + return st.FromResourceClaim(in). + ReservedFor(resourcev1alpha1.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}). + Obj() + }, + }, + }, + postbind: result{ + removed: []metav1.Object{schedulingInfo}, + }, + }, + }, + "in-use-by-other": { + nodes: []*v1.Node{}, + pod: otherPodWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{inUseClaim}, + classes: []*resourcev1alpha1.ResourceClass{}, + schedulings: []*resourcev1alpha1.PodScheduling{}, + prepare: prepare{}, + want: want{ + prefilter: result{ + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim in use`), + }, + postfilter: result{ + status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`), + }, + }, + }, + "wrong-topology-delayed-allocation": { + // PostFilter tries to get the pod scheduleable by + // deallocating the claim. + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{allocatedDelayedClaimWithWrongTopology}, + want: want{ + filter: perNodeResult{ + workerNode.Name: { + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`), + }, + }, + postfilter: result{ + // Claims with delayed allocation get deallocated. + changes: change{ + claim: func(in *resourcev1alpha1.ResourceClaim) *resourcev1alpha1.ResourceClaim { + return st.FromResourceClaim(in). + DeallocationRequested(true). + Obj() + }, + }, + }, + }, + }, + "wrong-topology-immediate-allocation": { + // PostFilter tries to get the pod scheduleable by + // deallocating the claim. + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{allocatedImmediateClaimWithWrongTopology}, + want: want{ + filter: perNodeResult{ + workerNode.Name: { + status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`), + }, + }, + postfilter: result{ + // Claims with immediate allocation don't. They would just get allocated again right + // away, without considering the needs of the pod. + status: framework.NewStatus(framework.Unschedulable, `still not schedulable`), + }, + }, + }, + "good-topology": { + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{allocatedClaimWithGoodTopology}, + want: want{ + reserve: result{ + changes: change{ + claim: func(in *resourcev1alpha1.ResourceClaim) *resourcev1alpha1.ResourceClaim { + return st.FromResourceClaim(in). + ReservedFor(resourcev1alpha1.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}). + Obj() + }, + }, + }, + }, + }, + "reserved-okay": { + pod: podWithClaimName, + claims: []*resourcev1alpha1.ResourceClaim{inUseClaim}, + }, + } + + for name, tc := range testcases { + // We can run in parallel because logging is per-test. + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() + nodes := tc.nodes + if nodes == nil { + nodes = []*v1.Node{workerNode} + } + testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings) + + initialObjects := testCtx.listAll(t) + result, status := testCtx.p.PreFilter(testCtx.ctx, testCtx.state, tc.pod) + t.Run("prefilter", func(t *testing.T) { + assert.Equal(t, tc.want.preFilterResult, result) + testCtx.verify(t, tc.want.prefilter, initialObjects, result, status) + }) + unschedulable := status.Code() != framework.Success + + var potentialNodes []*v1.Node + + initialObjects = testCtx.listAll(t) + testCtx.updateAPIServer(t, initialObjects, tc.prepare.filter) + if !unschedulable { + for _, nodeInfo := range testCtx.nodeInfos { + initialObjects = testCtx.listAll(t) + status := testCtx.p.Filter(testCtx.ctx, testCtx.state, tc.pod, nodeInfo) + nodeName := nodeInfo.Node().Name + t.Run(fmt.Sprintf("filter/%s", nodeInfo.Node().Name), func(t *testing.T) { + testCtx.verify(t, tc.want.filter.forNode(nodeName), initialObjects, nil, status) + }) + if status.Code() != framework.Success { + unschedulable = true + } else { + potentialNodes = append(potentialNodes, nodeInfo.Node()) + } + } + } + + if !unschedulable && len(potentialNodes) > 0 { + initialObjects = testCtx.listAll(t) + initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prescore) + status := testCtx.p.PreScore(testCtx.ctx, testCtx.state, tc.pod, potentialNodes) + t.Run("prescore", func(t *testing.T) { + testCtx.verify(t, tc.want.prescore, initialObjects, nil, status) + }) + if status.Code() != framework.Success { + unschedulable = true + } + } + + var selectedNode *v1.Node + if !unschedulable && len(potentialNodes) > 0 { + selectedNode = potentialNodes[0] + + initialObjects = testCtx.listAll(t) + initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.reserve) + status := testCtx.p.Reserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Name) + t.Run("reserve", func(t *testing.T) { + testCtx.verify(t, tc.want.reserve, initialObjects, nil, status) + }) + if status.Code() != framework.Success { + unschedulable = true + } + } + + if selectedNode != nil { + if unschedulable { + initialObjects = testCtx.listAll(t) + initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.unreserve) + testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Name) + t.Run("unreserve", func(t *testing.T) { + testCtx.verify(t, tc.want.unreserve, initialObjects, nil, status) + }) + } else { + initialObjects = testCtx.listAll(t) + initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postbind) + testCtx.p.PostBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Name) + t.Run("postbind", func(t *testing.T) { + testCtx.verify(t, tc.want.postbind, initialObjects, nil, status) + }) + } + } else { + initialObjects = testCtx.listAll(t) + initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postfilter) + result, status := testCtx.p.PostFilter(testCtx.ctx, testCtx.state, tc.pod, nil /* filteredNodeStatusMap not used by plugin */) + t.Run("postfilter", func(t *testing.T) { + assert.Equal(t, tc.want.postFilterResult, result) + testCtx.verify(t, tc.want.postfilter, initialObjects, nil, status) + }) + } + }) + } +} + +type testContext struct { + ctx context.Context + client *fake.Clientset + p *dynamicResources + nodeInfos []*framework.NodeInfo + state *framework.CycleState +} + +func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) { + t.Helper() + assert.Equal(t, expected.status, status) + objects := tc.listAll(t) + wantObjects := update(t, initialObjects, expected.changes) + for _, add := range expected.added { + wantObjects = append(wantObjects, add) + } + for _, remove := range expected.removed { + for i, obj := range wantObjects { + // This is a bit relaxed (no GVR comparison, no UID + // comparison) to simplify writing the test cases. + if obj.GetName() == remove.GetName() && obj.GetNamespace() == remove.GetNamespace() { + wantObjects = append(wantObjects[0:i], wantObjects[i+1:]...) + break + } + } + } + sortObjects(wantObjects) + stripObjects(wantObjects) + stripObjects(objects) + assert.Equal(t, wantObjects, objects) +} + +// setGVK is implemented by metav1.TypeMeta and thus all API objects, in +// contrast to metav1.Type, which is not (?!) implemented. +type setGVK interface { + SetGroupVersionKind(gvk schema.GroupVersionKind) +} + +// stripObjects removes certain fields (Kind, APIVersion, etc.) which are not +// important and might not be set. +func stripObjects(objects []metav1.Object) { + for _, obj := range objects { + obj.SetResourceVersion("") + obj.SetUID("") + if objType, ok := obj.(setGVK); ok { + objType.SetGroupVersionKind(schema.GroupVersionKind{}) + } + } +} + +func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) { + t.Helper() + claims, err := tc.client.ResourceV1alpha1().ResourceClaims("").List(tc.ctx, metav1.ListOptions{}) + require.NoError(t, err, "list claims") + for _, claim := range claims.Items { + objects = append(objects, &claim) + } + schedulings, err := tc.client.ResourceV1alpha1().PodSchedulings("").List(tc.ctx, metav1.ListOptions{}) + require.NoError(t, err, "list pod scheduling") + for _, scheduling := range schedulings.Items { + objects = append(objects, &scheduling) + } + + sortObjects(objects) + return +} + +// updateAPIServer modifies objects and stores any changed object in the API server. +func (tc *testContext) updateAPIServer(t *testing.T, objects []metav1.Object, updates change) []metav1.Object { + modified := update(t, objects, updates) + for i := range modified { + obj := modified[i] + if diff := cmp.Diff(objects[i], obj); diff != "" { + t.Logf("Updating %T %q, diff (-old, +new):\n%s", obj, obj.GetName(), diff) + switch obj := obj.(type) { + case *resourcev1alpha1.ResourceClaim: + obj, err := tc.client.ResourceV1alpha1().ResourceClaims(obj.Namespace).Update(tc.ctx, obj, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("unexpected error during prepare update: %v", err) + } + modified[i] = obj + case *resourcev1alpha1.PodScheduling: + obj, err := tc.client.ResourceV1alpha1().PodSchedulings(obj.Namespace).Update(tc.ctx, obj, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("unexpected error during prepare update: %v", err) + } + modified[i] = obj + default: + t.Fatalf("unsupported object type %T", obj) + } + } + } + return modified +} + +func sortObjects(objects []metav1.Object) { + sort.Slice(objects, func(i, j int) bool { + if objects[i].GetNamespace() < objects[j].GetNamespace() { + return true + } + return objects[i].GetName() < objects[j].GetName() + }) +} + +// update walks through all existing objects, finds the corresponding update +// function based on name and kind, and replaces those objects that have an +// update function. The rest is left unchanged. +func update(t *testing.T, objects []metav1.Object, updates change) []metav1.Object { + var updated []metav1.Object + + for _, obj := range objects { + switch in := obj.(type) { + case *resourcev1alpha1.ResourceClaim: + if updates.claim != nil { + obj = updates.claim(in) + } + case *resourcev1alpha1.PodScheduling: + if updates.scheduling != nil { + obj = updates.scheduling(in) + } + } + updated = append(updated, obj) + } + + return updated +} + +func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha1.ResourceClaim, classes []*resourcev1alpha1.ResourceClass, schedulings []*resourcev1alpha1.PodScheduling) (result *testContext) { + t.Helper() + + tc := &testContext{} + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + tc.ctx = ctx + + tc.client = fake.NewSimpleClientset() + reactor := createReactor(tc.client.Tracker()) + tc.client.PrependReactor("*", "*", reactor) + + informerFactory := informers.NewSharedInformerFactory(tc.client, 0) + + opts := []runtime.Option{ + runtime.WithClientSet(tc.client), + runtime.WithInformerFactory(informerFactory), + } + fh, err := runtime.NewFramework(nil, nil, tc.ctx.Done(), opts...) + if err != nil { + t.Fatal(err) + } + + pl, err := New(nil, fh, feature.Features{EnableDynamicResourceAllocation: true}) + if err != nil { + t.Fatal(err) + } + tc.p = pl.(*dynamicResources) + + // The tests use the API to create the objects because then reactors + // get triggered. + for _, claim := range claims { + _, err := tc.client.ResourceV1alpha1().ResourceClaims(claim.Namespace).Create(tc.ctx, claim, metav1.CreateOptions{}) + require.NoError(t, err, "create resource claim") + } + for _, class := range classes { + _, err := tc.client.ResourceV1alpha1().ResourceClasses().Create(tc.ctx, class, metav1.CreateOptions{}) + require.NoError(t, err, "create resource class") + } + for _, scheduling := range schedulings { + _, err := tc.client.ResourceV1alpha1().PodSchedulings(scheduling.Namespace).Create(tc.ctx, scheduling, metav1.CreateOptions{}) + require.NoError(t, err, "create pod scheduling") + } + + informerFactory.Start(tc.ctx.Done()) + t.Cleanup(func() { + // Need to cancel before waiting for the shutdown. + cancel() + // Now we can wait for all goroutines to stop. + informerFactory.Shutdown() + }) + + informerFactory.WaitForCacheSync(tc.ctx.Done()) + + for _, node := range nodes { + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(node) + tc.nodeInfos = append(tc.nodeInfos, nodeInfo) + } + tc.state = framework.NewCycleState() + + return tc +} + +// createReactor implements the logic required for the UID and ResourceVersion +// fields to work when using the fake client. Add it with client.PrependReactor +// to your fake client. ResourceVersion handling is required for conflict +// detection during updates, which is covered by some scenarios. +func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) { + var uidCounter int + var resourceVersionCounter int + var mutex sync.Mutex + + return func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) { + createAction, ok := action.(cgotesting.CreateAction) + if !ok { + return false, nil, nil + } + obj, ok := createAction.GetObject().(metav1.Object) + if !ok { + return false, nil, nil + } + + mutex.Lock() + defer mutex.Unlock() + switch action.GetVerb() { + case "create": + if obj.GetUID() != "" { + return true, nil, errors.New("UID must not be set on create") + } + if obj.GetResourceVersion() != "" { + return true, nil, errors.New("ResourceVersion must not be set on create") + } + obj.SetUID(types.UID(fmt.Sprintf("UID-%d", uidCounter))) + uidCounter++ + obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter)) + resourceVersionCounter++ + case "update": + uid := obj.GetUID() + resourceVersion := obj.GetResourceVersion() + if uid == "" { + return true, nil, errors.New("UID must be set on update") + } + if resourceVersion == "" { + return true, nil, errors.New("ResourceVersion must be set on update") + } + + oldObj, err := tracker.Get(action.GetResource(), obj.GetNamespace(), obj.GetName()) + if err != nil { + return true, nil, err + } + oldObjMeta, ok := oldObj.(metav1.Object) + if !ok { + return true, nil, errors.New("internal error: unexpected old object type") + } + if oldObjMeta.GetResourceVersion() != resourceVersion { + return true, nil, errors.New("ResourceVersion must match the object that gets updated") + } + + obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter)) + resourceVersionCounter++ + } + return false, nil, nil + } +} diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go index 48cd00b7abe..e0790a681ca 100644 --- a/pkg/scheduler/framework/plugins/feature/feature.go +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -20,6 +20,7 @@ package feature // This struct allows us to break the dependency of the plugins on // the internal k8s features pkg. type Features struct { + EnableDynamicResourceAllocation bool EnableReadWriteOncePod bool EnableVolumeCapacityPriority bool EnableMinDomainsInPodTopologySpread bool diff --git a/pkg/scheduler/framework/plugins/names/names.go b/pkg/scheduler/framework/plugins/names/names.go index 659a5ab4073..86dff2d25d2 100644 --- a/pkg/scheduler/framework/plugins/names/names.go +++ b/pkg/scheduler/framework/plugins/names/names.go @@ -20,6 +20,7 @@ const ( PrioritySort = "PrioritySort" DefaultBinder = "DefaultBinder" DefaultPreemption = "DefaultPreemption" + DynamicResources = "DynamicResources" ImageLocality = "ImageLocality" InterPodAffinity = "InterPodAffinity" NodeAffinity = "NodeAffinity" diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index fdd1334aaae..8881e58e4b3 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources" plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" @@ -46,6 +47,7 @@ import ( // through the WithFrameworkOutOfTreeRegistry option. func NewInTreeRegistry() runtime.Registry { fts := plfeature.Features{ + EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation), EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod), EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority), EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread), @@ -54,7 +56,8 @@ func NewInTreeRegistry() runtime.Registry { EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness), } - return runtime.Registry{ + registry := runtime.Registry{ + dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New), selectorspread.Name: selectorspread.New, imagelocality.Name: imagelocality.New, tainttoleration.Name: tainttoleration.New, @@ -78,4 +81,6 @@ func NewInTreeRegistry() runtime.Registry { defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New), schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New), } + + return registry } diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 420aa8a73d5..8e647fea466 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -64,6 +64,8 @@ const ( Node GVK = "Node" PersistentVolume GVK = "PersistentVolume" PersistentVolumeClaim GVK = "PersistentVolumeClaim" + PodScheduling GVK = "PodScheduling" + ResourceClaim GVK = "ResourceClaim" StorageClass GVK = "storage.k8s.io/StorageClass" CSINode GVK = "storage.k8s.io/CSINode" CSIDriver GVK = "storage.k8s.io/CSIDriver" diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 991ba9f1250..f632c42ebd7 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -20,6 +20,7 @@ import ( "fmt" v1 "k8s.io/api/core/v1" + resourcev1alpha1 "k8s.io/api/resource/v1alpha1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -249,6 +250,12 @@ func (p *PodWrapper) Containers(containers []v1.Container) *PodWrapper { return p } +// PodResourceClaims appends PodResourceClaims into PodSpec of the inner pod. +func (p *PodWrapper) PodResourceClaims(podResourceClaims ...v1.PodResourceClaim) *PodWrapper { + p.Spec.ResourceClaims = append(p.Spec.ResourceClaims, podResourceClaims...) + return p +} + // Priority sets a priority value into PodSpec of the inner pod. func (p *PodWrapper) Priority(val int32) *PodWrapper { p.Spec.Priority = &val @@ -791,3 +798,160 @@ func (p *PersistentVolumeWrapper) HostPathVolumeSource(src *v1.HostPathVolumeSou p.PersistentVolume.Spec.HostPath = src return p } + +// ResourceClaimWrapper wraps a ResourceClaim inside. +type ResourceClaimWrapper struct{ resourcev1alpha1.ResourceClaim } + +// MakeResourceClaim creates a ResourceClaim wrapper. +func MakeResourceClaim() *ResourceClaimWrapper { + return &ResourceClaimWrapper{resourcev1alpha1.ResourceClaim{}} +} + +// FromResourceClaim creates a ResourceClaim wrapper from some existing object. +func FromResourceClaim(other *resourcev1alpha1.ResourceClaim) *ResourceClaimWrapper { + return &ResourceClaimWrapper{*other.DeepCopy()} +} + +// Obj returns the inner ResourceClaim. +func (wrapper *ResourceClaimWrapper) Obj() *resourcev1alpha1.ResourceClaim { + return &wrapper.ResourceClaim +} + +// Name sets `s` as the name of the inner object. +func (wrapper *ResourceClaimWrapper) Name(s string) *ResourceClaimWrapper { + wrapper.SetName(s) + return wrapper +} + +// UID sets `s` as the UID of the inner object. +func (wrapper *ResourceClaimWrapper) UID(s string) *ResourceClaimWrapper { + wrapper.SetUID(types.UID(s)) + return wrapper +} + +// Namespace sets `s` as the namespace of the inner object. +func (wrapper *ResourceClaimWrapper) Namespace(s string) *ResourceClaimWrapper { + wrapper.SetNamespace(s) + return wrapper +} + +// OwnerReference updates the owning controller of the object. +func (wrapper *ResourceClaimWrapper) OwnerReference(name, uid string, gvk schema.GroupVersionKind) *ResourceClaimWrapper { + wrapper.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + Name: name, + UID: types.UID(uid), + Controller: pointer.Bool(true), + }, + } + return wrapper +} + +// AllocationMode sets the allocation mode of the inner object. +func (wrapper *ResourceClaimWrapper) AllocationMode(a resourcev1alpha1.AllocationMode) *ResourceClaimWrapper { + wrapper.ResourceClaim.Spec.AllocationMode = a + return wrapper +} + +// ResourceClassName sets the resource class name of the inner object. +func (wrapper *ResourceClaimWrapper) ResourceClassName(name string) *ResourceClaimWrapper { + wrapper.ResourceClaim.Spec.ResourceClassName = name + return wrapper +} + +// Allocation sets the allocation of the inner object. +func (wrapper *ResourceClaimWrapper) Allocation(allocation *resourcev1alpha1.AllocationResult) *ResourceClaimWrapper { + wrapper.ResourceClaim.Status.Allocation = allocation + return wrapper +} + +// DeallocationRequested sets that field of the inner object. +func (wrapper *ResourceClaimWrapper) DeallocationRequested(deallocationRequested bool) *ResourceClaimWrapper { + wrapper.ResourceClaim.Status.DeallocationRequested = deallocationRequested + return wrapper +} + +// ReservedFor sets that field of the inner object. +func (wrapper *ResourceClaimWrapper) ReservedFor(consumers ...resourcev1alpha1.ResourceClaimConsumerReference) *ResourceClaimWrapper { + wrapper.ResourceClaim.Status.ReservedFor = consumers + return wrapper +} + +// PodSchedulingWrapper wraps a PodScheduling inside. +type PodSchedulingWrapper struct{ resourcev1alpha1.PodScheduling } + +// MakePodScheduling creates a PodScheduling wrapper. +func MakePodScheduling() *PodSchedulingWrapper { + return &PodSchedulingWrapper{resourcev1alpha1.PodScheduling{}} +} + +// FromPodScheduling creates a PodScheduling wrapper from some existing object. +func FromPodScheduling(other *resourcev1alpha1.PodScheduling) *PodSchedulingWrapper { + return &PodSchedulingWrapper{*other.DeepCopy()} +} + +// Obj returns the inner object. +func (wrapper *PodSchedulingWrapper) Obj() *resourcev1alpha1.PodScheduling { + return &wrapper.PodScheduling +} + +// Name sets `s` as the name of the inner object. +func (wrapper *PodSchedulingWrapper) Name(s string) *PodSchedulingWrapper { + wrapper.SetName(s) + return wrapper +} + +// UID sets `s` as the UID of the inner object. +func (wrapper *PodSchedulingWrapper) UID(s string) *PodSchedulingWrapper { + wrapper.SetUID(types.UID(s)) + return wrapper +} + +// Namespace sets `s` as the namespace of the inner object. +func (wrapper *PodSchedulingWrapper) Namespace(s string) *PodSchedulingWrapper { + wrapper.SetNamespace(s) + return wrapper +} + +// OwnerReference updates the owning controller of the inner object. +func (wrapper *PodSchedulingWrapper) OwnerReference(name, uid string, gvk schema.GroupVersionKind) *PodSchedulingWrapper { + wrapper.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + Name: name, + UID: types.UID(uid), + Controller: pointer.Bool(true), + }, + } + return wrapper +} + +// Label applies a {k,v} label pair to the inner object +func (wrapper *PodSchedulingWrapper) Label(k, v string) *PodSchedulingWrapper { + if wrapper.Labels == nil { + wrapper.Labels = make(map[string]string) + } + wrapper.Labels[k] = v + return wrapper +} + +// SelectedNode sets that field of the inner object. +func (wrapper *PodSchedulingWrapper) SelectedNode(s string) *PodSchedulingWrapper { + wrapper.Spec.SelectedNode = s + return wrapper +} + +// PotentialNodes sets that field of the inner object. +func (wrapper *PodSchedulingWrapper) PotentialNodes(nodes ...string) *PodSchedulingWrapper { + wrapper.Spec.PotentialNodes = nodes + return wrapper +} + +// ResourceClaims sets that field of the inner object. +func (wrapper *PodSchedulingWrapper) ResourceClaims(statuses ...resourcev1alpha1.ResourceClaimSchedulingStatus) *PodSchedulingWrapper { + wrapper.Status.ResourceClaims = statuses + return wrapper +} diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 13266214bba..882c8b49b0a 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -564,6 +564,15 @@ func ClusterRoles() []rbacv1.ClusterRole { rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(), rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csistoragecapacities").RuleOrDie(), } + // Needed for dynamic resource allocation. + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + kubeSchedulerRules = append(kubeSchedulerRules, + rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceclaims", "resourceclasses").RuleOrDie(), + rbacv1helpers.NewRule(ReadUpdate...).Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(), + rbacv1helpers.NewRule(ReadWrite...).Groups(resourceGroup).Resources("podschedulings").RuleOrDie(), + rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("podschedulings/status").RuleOrDie(), + ) + } roles = append(roles, rbacv1.ClusterRole{ // a role to use for the kube-scheduler ObjectMeta: metav1.ObjectMeta{Name: "system:kube-scheduler"},