dra scheduler: support structured parameters

When a claim uses structured parameters, as indicated by the resource class
flag, the scheduler is responsible for allocating it. To do this it needs to
gather information about available node resources by watching
NodeResourceSlices and then match the in-tree claim parameters against those
resources.
This commit is contained in:
Patrick Ohly 2024-02-16 11:50:48 +01:00
parent a92d2a4cea
commit 096e948905
8 changed files with 799 additions and 95 deletions

View File

@ -472,6 +472,24 @@ func addAllEventHandlers(
}
handlers = append(handlers, handlerRegistration)
}
case framework.ResourceClaimParameters:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaimParameters().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaimParameters, "ResourceClaimParameters"),
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
}
case framework.ResourceClassParameters:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClassParameters().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceClassParameters, "ResourceClassParameters"),
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
}
case framework.StorageClass:
if at&framework.Add != 0 {
if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(

View File

@ -18,8 +18,10 @@ package dynamicresources
import (
"context"
"encoding/json"
"errors"
"fmt"
"slices"
"sort"
"sync"
@ -30,6 +32,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
@ -43,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/utils/ptr"
)
@ -70,37 +74,52 @@ type stateData struct {
// Empty if the Pod has no claims.
claims []*resourcev1alpha2.ResourceClaim
// podSchedulingState keeps track of the PodSchedulingContext
// (if one exists) and the changes made to it.
podSchedulingState podSchedulingState
// resourceModel contains the information about available and allocated resources when using
// structured parameters and the pod needs this information.
resources resources
// mutex must be locked while accessing any of the fields below.
mutex sync.Mutex
// The indices of all claims that:
// - are allocated
// - use delayed allocation
// - use delayed allocation or the builtin controller
// - were not available on at least one node
//
// Set in parallel during Filter, so write access there must be
// protected by the mutex. Used by PostFilter.
unavailableClaims sets.Set[int]
// podSchedulingState keeps track of the PodSchedulingContext
// (if one exists) and the changes made to it.
podSchedulingState podSchedulingState
mutex sync.Mutex
informationsForClaim []informationForClaim
}
func (d *stateData) Clone() framework.StateData {
return d
}
type informationForClaim struct {
// The availableOnNode node filter of the claim converted from the
// v1 API to nodeaffinity.NodeSelector by PreFilter for repeated
// evaluation in Filter. Nil for claim which don't have it.
availableOnNode *nodeaffinity.NodeSelector
// The status of the claim got from the
// schedulingCtx by PreFilter for repeated
// evaluation in Filter. Nil for claim which don't have it.
status *resourcev1alpha2.ResourceClaimSchedulingStatus
}
func (d *stateData) Clone() framework.StateData {
return d
// structuredParameters is true if the claim is handled via the builtin
// controller.
structuredParameters bool
controller *claimController
// Set by Reserved, published by PreBind.
allocation *resourcev1alpha2.AllocationResult
allocationDriverName string
}
type podSchedulingState struct {
@ -256,23 +275,90 @@ type dynamicResources struct {
claimLister resourcev1alpha2listers.ResourceClaimLister
classLister resourcev1alpha2listers.ResourceClassLister
podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister
classParametersLister resourcev1alpha2listers.ResourceClassParametersLister
nodeResourceSliceLister resourcev1alpha2listers.NodeResourceSliceLister
claimNameLookup *resourceclaim.Lookup
// claimAssumeCache enables temporarily storing a newer claim object
// while the scheduler has allocated it and the corresponding object
// update from the apiserver has not been processed by the claim
// informer callbacks. Claims get added here in Reserve and removed by
// the informer callback (based on the "newer than" comparison in the
// assume cache) or when the API call in PreBind fails.
//
// It uses cache.MetaNamespaceKeyFunc to generate object names, which
// therefore are "<namespace>/<name>".
//
// This is necessary to ensure that reconstructing the resource usage
// at the start of a pod scheduling cycle doesn't reuse the resources
// assigned to such a claim. Alternatively, claim allocation state
// could also get tracked across pod scheduling cycles, but that
// - adds complexity (need to carefully sync state with informer events
// for claims and NodeResourceSlices)
// - would make integration with cluster autoscaler harder because it would need
// to trigger informer callbacks.
//
// When implementing cluster autoscaler support, this assume cache or
// something like it (see https://github.com/kubernetes/kubernetes/pull/112202)
// might have to be managed by the cluster autoscaler.
claimAssumeCache volumebinding.AssumeCache
// inFlightAllocations is map from claim UUIDs to true for those claims
// for which allocation was triggered during a scheduling cycle and the
// corresponding claim status update call in PreBind has not been done
// yet. If another pod needs the claim, the pod is treated as "not
// schedulable yet". The cluster event for the claim status update will
// make it schedulable.
//
// This mechanism avoids the following problem:
// - Pod A triggers allocation for claim X.
// - Pod B shares access to that claim and gets scheduled because
// the claim is assumed to be allocated.
// - PreBind for pod B is called first, tries to update reservedFor and
// fails because the claim is not really allocated yet.
//
// We could avoid the ordering problem by allowing either pod A or pod B
// to set the allocation. But that is more complicated and leads to another
// problem:
// - Pod A and B get scheduled as above.
// - PreBind for pod A gets called first, then fails with a temporary API error.
// It removes the updated claim from the assume cache because of that.
// - PreBind for pod B gets called next and succeeds with adding the
// allocation and its own reservedFor entry.
// - The assume cache is now not reflecting that the claim is allocated,
// which could lead to reusing the same resource for some other claim.
//
// A sync.Map is used because in practice sharing of a claim between
// pods is expected to be rare compared to per-pod claim, so we end up
// hitting the "multiple goroutines read, write, and overwrite entries
// for disjoint sets of keys" case that sync.Map is optimized for.
inFlightAllocations sync.Map
}
// New initializes a new plugin and returns it.
func New(_ context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
if !fts.EnableDynamicResourceAllocation {
// Disabled, won't do anything.
return &dynamicResources{}, nil
}
return &dynamicResources{
logger := klog.FromContext(ctx)
pl := &dynamicResources{
enabled: true,
fh: fh,
clientset: fh.ClientSet(),
claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
}, nil
claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(),
classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
nodeResourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().NodeResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
}
return pl, nil
}
var _ framework.PreEnqueuePlugin = &dynamicResources{}
@ -296,7 +382,13 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint
if !pl.enabled {
return nil
}
events := []framework.ClusterEventWithHint{
// Changes for claim or class parameters creation may make pods
// schedulable which depend on claims using those parameters.
{Event: framework.ClusterEvent{Resource: framework.ResourceClaimParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimParametersChange},
{Event: framework.ClusterEvent{Resource: framework.ResourceClassParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClassParametersChange},
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
// When a driver has provided additional information, a pod waiting for that information
@ -321,6 +413,149 @@ func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status
return nil
}
// isSchedulableAfterClaimParametersChange is invoked for add and update claim parameters events reported by
// an informer. It checks whether that change made a previously unschedulable
// pod schedulable. It errs on the side of letting a pod scheduling attempt
// happen. The delete claim event will not invoke it, so newObj will never be nil.
func (pl *dynamicResources) isSchedulableAfterClaimParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClaimParameters](oldObj, newObj)
if err != nil {
// Shouldn't happen.
return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimParametersChange: %w", err)
}
usesParameters := false
if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
ref := claim.Spec.ParametersRef
if ref == nil {
return
}
// Using in-tree parameters directly?
if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
ref.Kind == "ResourceClaimParameters" {
if modifiedParameters.Name == ref.Name {
usesParameters = true
}
return
}
// Need to look for translated parameters.
generatedFrom := modifiedParameters.GeneratedFrom
if generatedFrom == nil {
return
}
if generatedFrom.APIGroup == ref.APIGroup &&
generatedFrom.Kind == ref.Kind &&
generatedFrom.Name == ref.Name {
usesParameters = true
}
}); err != nil {
// This is not an unexpected error: we know that
// foreachPodResourceClaim only returns errors for "not
// schedulable".
logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedParameters), "reason", err.Error())
return framework.QueueSkip, nil
}
if !usesParameters {
// This were not the parameters the pod was waiting for.
logger.V(6).Info("unrelated claim parameters got modified", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
return framework.QueueSkip, nil
}
if originalParameters == nil {
logger.V(4).Info("claim parameters for pod got created", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
return framework.Queue, nil
}
// Modifications may or may not be relevant. If the entire
// requests are as before, then something else must have changed
// and we don't care.
if apiequality.Semantic.DeepEqual(&originalParameters.DriverRequests, &modifiedParameters.DriverRequests) {
logger.V(6).Info("claim parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
return framework.QueueSkip, nil
}
logger.V(4).Info("requests in claim parameters for pod got updated", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
return framework.Queue, nil
}
// isSchedulableAfterClassParametersChange is invoked for add and update class parameters events reported by
// an informer. It checks whether that change made a previously unschedulable
// pod schedulable. It errs on the side of letting a pod scheduling attempt
// happen. The delete class event will not invoke it, so newObj will never be nil.
func (pl *dynamicResources) isSchedulableAfterClassParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClassParameters](oldObj, newObj)
if err != nil {
// Shouldn't happen.
return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClassParametersChange: %w", err)
}
usesParameters := false
if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
if err != nil {
if !apierrors.IsNotFound(err) {
logger.Error(err, "look up resource class")
}
return
}
ref := class.ParametersRef
if ref == nil {
return
}
// Using in-tree parameters directly?
if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
ref.Kind == "ResourceClassParameters" {
if modifiedParameters.Name == ref.Name {
usesParameters = true
}
return
}
// Need to look for translated parameters.
generatedFrom := modifiedParameters.GeneratedFrom
if generatedFrom == nil {
return
}
if generatedFrom.APIGroup == ref.APIGroup &&
generatedFrom.Kind == ref.Kind &&
generatedFrom.Name == ref.Name {
usesParameters = true
}
}); err != nil {
// This is not an unexpected error: we know that
// foreachPodResourceClaim only returns errors for "not
// schedulable".
logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters), "reason", err.Error())
return framework.QueueSkip, nil
}
if !usesParameters {
// This were not the parameters the pod was waiting for.
logger.V(6).Info("unrelated class parameters got modified", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
return framework.QueueSkip, nil
}
if originalParameters == nil {
logger.V(4).Info("class parameters for pod got created", "pod", klog.KObj(pod), "class", klog.KObj(modifiedParameters))
return framework.Queue, nil
}
// Modifications may or may not be relevant. If the entire
// requests are as before, then something else must have changed
// and we don't care.
if apiequality.Semantic.DeepEqual(&originalParameters.Filters, &modifiedParameters.Filters) {
logger.V(6).Info("class parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
return framework.QueueSkip, nil
}
logger.V(4).Info("filters in class parameters for pod got updated", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
return framework.Queue, nil
}
// isSchedulableAfterClaimChange is invoked for add and update claim events reported by
// an informer. It checks whether that change made a previously unschedulable
// pod schedulable. It errs on the side of letting a pod scheduling attempt
@ -345,6 +580,33 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
return framework.QueueSkip, nil
}
if originalClaim != nil &&
resourceclaim.IsAllocatedWithStructuredParameters(originalClaim) &&
modifiedClaim.Status.Allocation == nil {
// A claim with structured parameters was deallocated. This might have made
// resources available for other pods.
//
// TODO (https://github.com/kubernetes/kubernetes/issues/123697):
// check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO).
//
// There is a small race here:
// - The dynamicresources plugin allocates claim A and updates the assume cache.
// - A second pod gets marked as unschedulable based on that assume cache.
// - Before the informer cache here catches up, the pod runs, terminates and
// the claim gets deallocated without ever sending the claim status with
// allocation to the scheduler.
// - The comparison below is for a *very* old claim with no allocation and the
// new claim where the allocation is already removed again, so no
// RemovedClaimAllocation event gets emitted.
//
// This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30.
// TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache
// into the event mechanism. This can be tackled together with adding autoscaler
// support, which also needs to do something with the assume cache.
logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.Queue, nil
}
if !usesClaim {
// This was not the claim the pod was waiting for.
logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
@ -526,7 +788,7 @@ func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2.
// It calls an optional handler for those claims that it finds.
func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourcev1alpha2.ResourceClaim)) error {
for _, resource := range pod.Spec.ResourceClaims {
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &resource)
claimName, mustCheckOwner, err := pl.claimNameLookup.Name(pod, &resource)
if err != nil {
return err
}
@ -578,24 +840,21 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, statusUnschedulable(logger, err.Error())
}
logger.V(5).Info("pod resource claims", "pod", klog.KObj(pod), "resourceclaims", klog.KObjSlice(claims))
// If the pod does not reference any claim,
// DynamicResources Filter has nothing to do with the Pod.
if len(claims) == 0 {
return nil, framework.NewStatus(framework.Skip)
}
// Fetch s.podSchedulingState.schedulingCtx, it's going to be needed when checking claims.
// Fetch PodSchedulingContext, it's going to be needed when checking claims.
if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim = make([]informationForClaim, len(claims))
needResourceInformation := false
for index, claim := range claims {
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate &&
claim.Status.Allocation == nil {
// This will get resolved by the resource driver.
return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
}
if claim.Status.DeallocationRequested {
// This will get resolved by the resource driver.
return nil, statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
@ -606,16 +865,20 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
// Resource is in use. The pod has to wait.
return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
}
if claim.Status.Allocation != nil &&
claim.Status.Allocation.AvailableOnNodes != nil {
nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes)
if err != nil {
return nil, statusError(logger, err)
if claim.Status.Allocation != nil {
if claim.Status.Allocation.AvailableOnNodes != nil {
nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes)
if err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim[index].availableOnNode = nodeSelector
}
s.informationsForClaim[index].availableOnNode = nodeSelector
}
if claim.Status.Allocation == nil &&
claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer {
// The claim was allocated by the scheduler if it has the finalizer that is
// reserved for Kubernetes.
s.informationsForClaim[index].structuredParameters = slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer)
} else {
// The ResourceClass might have a node filter. This is
// useful for trimming the initial set of potential
// nodes before we ask the driver(s) for information
@ -638,16 +901,140 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
}
s.informationsForClaim[index].availableOnNode = selector
}
// Now we need information from drivers.
s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name)
if class.StructuredParameters != nil && *class.StructuredParameters {
s.informationsForClaim[index].structuredParameters = true
// Allocation in flight? Better wait for that
// to finish, see inFlightAllocations
// documentation for details.
if _, found := pl.inFlightAllocations.Load(claim.UID); found {
return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim)))
}
// We need the claim and class parameters. If
// they don't exist yet, the pod has to wait.
//
// TODO (https://github.com/kubernetes/kubernetes/issues/123697):
// check this already in foreachPodResourceClaim, together with setting up informationsForClaim.
// Then PreEnqueue will also check for existence of parameters.
classParameters, claimParameters, status := pl.lookupParameters(logger, class, claim)
if status != nil {
return nil, status
}
controller, err := newClaimController(logger, class, classParameters, claimParameters)
if err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim[index].controller = controller
needResourceInformation = true
} else if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate {
// This will get resolved by the resource driver.
return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
}
}
}
if needResourceInformation {
// Doing this over and over again for each pod could be avoided
// by parsing once when creating the plugin and then updating
// that state in informer callbacks. But that would cause
// problems for using the plugin in the Cluster Autoscaler. If
// this step here turns out to be expensive, we may have to
// maintain and update state more persistently.
resources, err := newResourceModel(logger, pl.nodeResourceSliceLister, pl.claimAssumeCache)
if err != nil {
return nil, statusError(logger, err)
}
s.resources = resources
}
s.claims = claims
state.Write(stateKey, s)
return nil, nil
}
func (pl *dynamicResources) lookupParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters, status *framework.Status) {
classParameters, status = pl.lookupClassParameters(logger, class)
if status != nil {
return
}
claimParameters, status = pl.lookupClaimParameters(logger, claim)
return
}
func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass) (*resourcev1alpha2.ResourceClassParameters, *framework.Status) {
if class.ParametersRef == nil {
return nil, nil
}
if class.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
class.ParametersRef.Kind == "ResourceClassParameters" {
// Use the parameters which were referenced directly.
parameters, err := pl.classParametersLister.ResourceClassParameters(class.ParametersRef.Namespace).Get(class.ParametersRef.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, statusUnschedulable(logger, fmt.Sprintf("class parameters %s not found", klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name)))
}
return nil, statusError(logger, fmt.Errorf("get class parameters %s: %v", klog.KRef(class.Namespace, class.ParametersRef.Name), err))
}
return parameters, nil
}
// TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer
allParameters, err := pl.classParametersLister.ResourceClassParameters(class.Namespace).List(labels.Everything())
if err != nil {
return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err))
}
for _, parameters := range allParameters {
if parameters.GeneratedFrom == nil {
continue
}
if parameters.GeneratedFrom.APIGroup == class.ParametersRef.APIGroup &&
parameters.GeneratedFrom.Kind == class.ParametersRef.Kind &&
parameters.GeneratedFrom.Name == class.ParametersRef.Name &&
parameters.GeneratedFrom.Namespace == class.ParametersRef.Namespace {
return parameters, nil
}
}
return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name)))
}
func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, claim *resourcev1alpha2.ResourceClaim) (*resourcev1alpha2.ResourceClaimParameters, *framework.Status) {
if claim.Spec.ParametersRef == nil {
return nil, nil
}
if claim.Spec.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
claim.Spec.ParametersRef.Kind == "ResourceClaimParameters" {
// Use the parameters which were referenced directly.
parameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).Get(claim.Spec.ParametersRef.Name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, statusUnschedulable(logger, fmt.Sprintf("claim parameters %s not found", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
}
return nil, statusError(logger, fmt.Errorf("get claim parameters %s: %v", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), err))
}
return parameters, nil
}
// TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer
allParameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).List(labels.Everything())
if err != nil {
return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err))
}
for _, parameters := range allParameters {
if parameters.GeneratedFrom == nil {
continue
}
if parameters.GeneratedFrom.APIGroup == claim.Spec.ParametersRef.APIGroup &&
parameters.GeneratedFrom.Kind == claim.Spec.ParametersRef.Kind &&
parameters.GeneratedFrom.Name == claim.Spec.ParametersRef.Name {
return parameters, nil
}
}
return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
}
// PreFilterExtensions returns prefilter extensions, pod add and remove.
func (pl *dynamicResources) PreFilterExtensions() framework.PreFilterExtensions {
return nil
@ -703,22 +1090,39 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
case claim.Status.DeallocationRequested:
// We shouldn't get here. PreFilter already checked this.
return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer:
case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer ||
state.informationsForClaim[index].structuredParameters:
if selector := state.informationsForClaim[index].availableOnNode; selector != nil {
if matches := selector.Match(node); !matches {
return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclassName", claim.Spec.ResourceClassName)
}
}
if status := state.informationsForClaim[index].status; status != nil {
for _, unsuitableNode := range status.UnsuitableNodes {
if node.Name == unsuitableNode {
return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes)
// Can the builtin controller tell us whether the node is suitable?
if state.informationsForClaim[index].structuredParameters {
suitable, err := state.informationsForClaim[index].controller.nodeIsSuitable(ctx, node.Name, state.resources)
if err != nil {
// An error indicates that something wasn't configured correctly, for example
// writing a CEL expression which doesn't handle a map lookup error. Normally
// this should never fail. We could return an error here, but then the pod
// would get retried. Instead we ignore the node.
return statusUnschedulable(logger, fmt.Sprintf("checking structured parameters failed: %v", err), "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
}
if !suitable {
return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
}
} else {
if status := state.informationsForClaim[index].status; status != nil {
for _, unsuitableNode := range status.UnsuitableNodes {
if node.Name == unsuitableNode {
return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes)
}
}
}
}
default:
// This should have been delayed allocation. Immediate
// allocation was already checked for in PreFilter.
// This claim should have been handled above.
// Immediate allocation with control plane controller
// was already checked for in PreFilter.
return statusError(logger, fmt.Errorf("internal error, unexpected allocation mode %v", claim.Spec.AllocationMode))
}
}
@ -736,7 +1140,11 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
// delayed allocation. Claims with immediate allocation
// would just get allocated again for a random node,
// which is unlikely to help the pod.
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer {
//
// Claims with builtin controller are handled like
// claims with delayed allocation.
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer ||
state.informationsForClaim[index].controller != nil {
state.unavailableClaims.Insert(index)
}
}
@ -769,12 +1177,19 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS
claim := state.claims[index]
if len(claim.Status.ReservedFor) == 0 ||
len(claim.Status.ReservedFor) == 1 && claim.Status.ReservedFor[0].UID == pod.UID {
// Is the claim is handled by the builtin controller?
// Then we can simply clear the allocation. Once the
// claim informer catches up, the controllers will
// be notified about this change.
clearAllocation := state.informationsForClaim[index].controller != nil
// Before we tell a driver to deallocate a claim, we
// have to stop telling it to allocate. Otherwise,
// depending on timing, it will deallocate the claim,
// see a PodSchedulingContext with selected node, and
// allocate again for that same node.
if state.podSchedulingState.schedulingCtx != nil &&
if !clearAllocation &&
state.podSchedulingState.schedulingCtx != nil &&
state.podSchedulingState.schedulingCtx.Spec.SelectedNode != "" {
state.podSchedulingState.selectedNode = ptr.To("")
if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
@ -782,9 +1197,13 @@ func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleS
}
}
claim := state.claims[index].DeepCopy()
claim.Status.DeallocationRequested = true
claim := claim.DeepCopy()
claim.Status.ReservedFor = nil
if clearAllocation {
claim.Status.Allocation = nil
} else {
claim.Status.DeallocationRequested = true
}
logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
return nil, statusError(logger, err)
@ -815,14 +1234,15 @@ func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleSta
logger := klog.FromContext(ctx)
pending := false
for _, claim := range state.claims {
if claim.Status.Allocation == nil {
for index, claim := range state.claims {
if claim.Status.Allocation == nil &&
state.informationsForClaim[index].controller == nil {
pending = true
break
}
}
if !pending {
logger.V(5).Info("no pending claims", "pod", klog.KObj(pod))
logger.V(5).Info("no pending claims with control plane controller", "pod", klog.KObj(pod))
return nil
}
@ -889,7 +1309,7 @@ func haveNode(nodeNames []string, nodeName string) bool {
}
// Reserve reserves claims for the pod.
func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
if !pl.enabled {
return nil
}
@ -903,6 +1323,7 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
numDelayedAllocationPending := 0
numClaimsWithStatusInfo := 0
claimsWithBuiltinController := make([]int, 0, len(state.claims))
logger := klog.FromContext(ctx)
for index, claim := range state.claims {
if claim.Status.Allocation != nil {
@ -914,7 +1335,13 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
continue
}
// Must be delayed allocation.
// Do we have the builtin controller?
if state.informationsForClaim[index].controller != nil {
claimsWithBuiltinController = append(claimsWithBuiltinController, index)
continue
}
// Must be delayed allocation with control plane controller.
numDelayedAllocationPending++
// Did the driver provide information that steered node
@ -924,12 +1351,12 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
}
}
if numDelayedAllocationPending == 0 {
if numDelayedAllocationPending == 0 && len(claimsWithBuiltinController) == 0 {
// Nothing left to do.
return nil
}
if !state.preScored {
if !state.preScored && numDelayedAllocationPending > 0 {
// There was only one candidate that passed the Filters and
// therefore PreScore was not called.
//
@ -944,11 +1371,36 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
}
}
// Prepare allocation of claims handled by the schedulder.
for _, index := range claimsWithBuiltinController {
claim := state.claims[index]
driverName, allocation, err := state.informationsForClaim[index].controller.allocate(ctx, nodeName, state.resources)
if err != nil {
// We checked before that the node is suitable. This shouldn't have failed,
// so treat this as an error.
return statusError(logger, fmt.Errorf("claim allocation failed unexpectedly: %v", err))
}
state.informationsForClaim[index].allocation = allocation
state.informationsForClaim[index].allocationDriverName = driverName
pl.inFlightAllocations.Store(claim.UID, true)
claim = claim.DeepCopy()
claim.Status.DriverName = driverName
claim.Status.Allocation = allocation
if err := pl.claimAssumeCache.Assume(claim); err != nil {
return statusError(logger, fmt.Errorf("update claim assume cache: %v", err))
}
logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", allocation)
}
// When there is only one pending resource, we can go ahead with
// requesting allocation even when we don't have the information from
// the driver yet. Otherwise we wait for information before blindly
// making a decision that might have to be reversed later.
if numDelayedAllocationPending == 1 || numClaimsWithStatusInfo == numDelayedAllocationPending {
//
// If all pending claims are handled with the builtin controller,
// there is no need for a PodSchedulingContext change.
if numDelayedAllocationPending == 1 && len(claimsWithBuiltinController) == 0 ||
numClaimsWithStatusInfo+len(claimsWithBuiltinController) == numDelayedAllocationPending && len(claimsWithBuiltinController) < numDelayedAllocationPending {
// TODO: can we increase the chance that the scheduler picks
// the same node as before when allocation is on-going,
// assuming that that node still fits the pod? Picking a
@ -970,6 +1422,13 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
return nil
}
// If all pending claims are handled with the builtin controller, then
// we can allow the pod to proceed. Allocating and reserving the claims
// will be done in PreBind.
if numDelayedAllocationPending == 0 {
return nil
}
// More than one pending claim and not enough information about all of them.
//
// TODO: can or should we ensure that schedulingCtx gets aborted while
@ -1016,7 +1475,15 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt
}
}
for _, claim := range state.claims {
for index, claim := range state.claims {
// If allocation was in-flight, then it's not anymore and we need to revert the
// claim object in the assume cache to what it was before.
if state.informationsForClaim[index].controller != nil {
if _, found := pl.inFlightAllocations.LoadAndDelete(state.claims[index].UID); found {
pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name)
}
}
if claim.Status.Allocation != nil &&
resourceclaim.IsReservedForPod(pod, claim) {
// Remove pod from ReservedFor. A strategic-merge-patch is used
@ -1038,7 +1505,10 @@ func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleSt
// PreBind gets called in a separate goroutine after it has been determined
// that the pod should get bound to this node. Because Reserve did not actually
// reserve claims, we need to do it now. If that fails, we return an error and
// reserve claims, we need to do it now. For claims with the builtin controller,
// we also handle the allocation.
//
// If anything fails, we return an error and
// the pod will have to go into the backoff queue. The scheduler will call
// Unreserve as part of the error handling.
func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
@ -1056,6 +1526,7 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat
logger := klog.FromContext(ctx)
// Was publishing delayed? If yes, do it now and then cause binding to stop.
// This will not happen if all claims get handled by builtin controllers.
if state.podSchedulingState.isDirty() {
if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
return statusError(logger, err)
@ -1065,23 +1536,7 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat
for index, claim := range state.claims {
if !resourceclaim.IsReservedForPod(pod, claim) {
// The claim might be stale, for example because the claim can get shared and some
// other goroutine has updated it in the meantime. We therefore cannot use
// SSA here to add the pod because then we would have to send the entire slice
// or use different field manager strings for each entry.
//
// With a strategic-merge-patch, we can simply send one new entry. The apiserver
// validation will catch if two goroutines try to do that at the same time and
// the claim cannot be shared.
patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`,
claim.UID,
pod.Name,
pod.UID,
)
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim))
// TODO: metric for update errors.
claim, err := pl.bindClaim(ctx, state, index, pod, nodeName)
if err != nil {
return statusError(logger, err)
}
@ -1093,6 +1548,75 @@ func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleStat
return nil
}
// bindClaim gets called by PreBind for claim which is not reserved for the pod yet.
// It might not even be allocated. bindClaim then ensures that the allocation
// and reservation are recorded. This finishes the work started in Reserve.
func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) {
logger := klog.FromContext(ctx)
claim := state.claims[index]
allocationPatch := ""
allocation := state.informationsForClaim[index].allocation
logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", allocation)
// Do we need to store an allocation result from Reserve?
if allocation != nil {
buffer, err := json.Marshal(allocation)
if err != nil {
return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err)
}
allocationPatch = fmt.Sprintf(`"driverName": %q, "allocation": %s, `, state.informationsForClaim[index].allocationDriverName, string(buffer))
// The finalizer needs to be added in a normal update. Using a simple update is fine
// because we don't expect concurrent modifications while the claim is not allocated
// yet. If there are any, we want to fail.
//
// If we were interrupted in the past, it might already be set and we simply continue.
if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
claim := state.claims[index].DeepCopy()
claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
return nil, fmt.Errorf("add finalizer: %v", err)
}
}
}
// The claim might be stale, for example because the claim can get shared and some
// other goroutine has updated it in the meantime. We therefore cannot use
// SSA here to add the pod because then we would have to send the entire slice
// or use different field manager strings for each entry.
//
// With a strategic-merge-patch, we can simply send one new entry. The apiserver
// validation will catch if two goroutines try to do that at the same time and
// the claim cannot be shared.
//
// Note that this also works when the allocation result gets added twice because
// two pods both started using a shared claim: the first pod to get here adds the
// allocation result. The second pod then only adds itself to reservedFor.
patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": {%s "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`,
claim.UID,
allocationPatch,
pod.Name,
pod.UID,
)
if loggerV := logger.V(6); loggerV.Enabled() {
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch)
} else {
logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
}
claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err)
if allocationPatch != "" {
// The scheduler was handling allocation. Now that has
// completed, either successfully or with a failure.
if err != nil {
pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name)
}
pl.inFlightAllocations.Delete(claim.UID)
}
return claim, err
}
// PostBind is called after a pod is successfully bound to a node. Now we are
// sure that a PodSchedulingContext object, if it exists, is definitely not going to
// be needed anymore and can delete it. This is a one-shot thing, there won't

View File

@ -309,8 +309,9 @@ func TestPlugin(t *testing.T) {
},
},
"waiting-for-immediate-allocation": {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
classes: []*resourcev1alpha2.ResourceClass{resourceClass},
want: want{
prefilter: result{
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `unallocated immediate resourceclaim`),
@ -812,7 +813,6 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
tc.client = fake.NewSimpleClientset()
reactor := createReactor(tc.client.Tracker())
tc.client.PrependReactor("*", "*", reactor)
tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
opts := []runtime.Option{

View File

@ -0,0 +1,134 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicresources
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
"k8s.io/apimachinery/pkg/labels"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
)
// resources is a map "node name" -> "driver name" -> available and
// allocated resources per structured parameter model.
type resources map[string]map[string]resourceModels
// resourceModels may have more than one entry because it is valid for a driver to
// use more than one structured parameter model.
type resourceModels struct {
// TODO: add some structured parameter model
}
// newResourceModel parses the available information about resources. Objects
// with an unknown structured parameter model silently ignored. An error gets
// logged later when parameters required for a pod depend on such an unknown
// model.
func newResourceModel(logger klog.Logger, nodeResourceSliceLister resourcev1alpha2listers.NodeResourceSliceLister, claimAssumeCache volumebinding.AssumeCache) (resources, error) {
model := make(resources)
slices, err := nodeResourceSliceLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("list node resource slices: %w", err)
}
for _, slice := range slices {
if model[slice.NodeName] == nil {
model[slice.NodeName] = make(map[string]resourceModels)
}
resource := model[slice.NodeName][slice.DriverName]
// TODO: add some structured parameter model
model[slice.NodeName][slice.DriverName] = resource
}
objs := claimAssumeCache.List(nil)
for _, obj := range objs {
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
if !ok {
return nil, fmt.Errorf("got unexpected object of type %T from claim assume cache", obj)
}
if claim.Status.Allocation == nil {
continue
}
for _, handle := range claim.Status.Allocation.ResourceHandles {
structured := handle.StructuredData
if structured == nil {
continue
}
if model[structured.NodeName] == nil {
model[structured.NodeName] = make(map[string]resourceModels)
}
// resource := model[structured.NodeName][handle.DriverName]
// TODO: add some structured parameter model
// for _, result := range structured.Results {
// // Call AddAllocation for each known model. Each call itself needs to check for nil.
// }
}
}
return model, nil
}
func newClaimController(logger klog.Logger, class *resourcev1alpha2.ResourceClass, classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters) (*claimController, error) {
// Each node driver is separate from the others. Each driver may have
// multiple requests which need to be allocated together, so here
// we have to collect them per model.
// TODO: implement some structured parameters model
c := &claimController{
class: class,
classParameters: classParameters,
claimParameters: claimParameters,
}
return c, nil
}
// claimController currently wraps exactly one structured parameter model.
type claimController struct {
class *resourcev1alpha2.ResourceClass
classParameters *resourcev1alpha2.ResourceClassParameters
claimParameters *resourcev1alpha2.ResourceClaimParameters
// TODO: implement some structured parameters model
}
func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) {
// TODO: implement some structured parameters model
return true, nil
}
func (c claimController) allocate(ctx context.Context, nodeName string, resources resources) (string, *resourcev1alpha2.AllocationResult, error) {
allocation := &resourcev1alpha2.AllocationResult{
Shareable: c.claimParameters.Shareable,
AvailableOnNodes: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "kubernetes.io/hostname", Operator: v1.NodeSelectorOpIn, Values: []string{nodeName}},
},
},
},
},
}
// TODO: implement some structured parameters model
return c.class.DriverName, allocation, nil
}

View File

@ -72,17 +72,19 @@ const (
// - a Pod that is deleted
// - a Pod that was assumed, but gets un-assumed due to some errors in the binding cycle.
// - an existing Pod that was unscheduled but gets scheduled to a Node.
Pod GVK = "Pod"
Node GVK = "Node"
PersistentVolume GVK = "PersistentVolume"
PersistentVolumeClaim GVK = "PersistentVolumeClaim"
CSINode GVK = "storage.k8s.io/CSINode"
CSIDriver GVK = "storage.k8s.io/CSIDriver"
CSIStorageCapacity GVK = "storage.k8s.io/CSIStorageCapacity"
StorageClass GVK = "storage.k8s.io/StorageClass"
PodSchedulingContext GVK = "PodSchedulingContext"
ResourceClaim GVK = "ResourceClaim"
ResourceClass GVK = "ResourceClass"
Pod GVK = "Pod"
Node GVK = "Node"
PersistentVolume GVK = "PersistentVolume"
PersistentVolumeClaim GVK = "PersistentVolumeClaim"
CSINode GVK = "storage.k8s.io/CSINode"
CSIDriver GVK = "storage.k8s.io/CSIDriver"
CSIStorageCapacity GVK = "storage.k8s.io/CSIStorageCapacity"
StorageClass GVK = "storage.k8s.io/StorageClass"
PodSchedulingContext GVK = "PodSchedulingContext"
ResourceClaim GVK = "ResourceClaim"
ResourceClass GVK = "ResourceClass"
ResourceClaimParameters GVK = "ResourceClaimParameters"
ResourceClassParameters GVK = "ResourceClassParameters"
// WildCard is a special GVK to match all resources.
// e.g., If you register `{Resource: "*", ActionType: All}` in EventsToRegister,
@ -176,6 +178,8 @@ func UnrollWildCardResource() []ClusterEventWithHint {
{Event: ClusterEvent{Resource: PodSchedulingContext, ActionType: All}},
{Event: ClusterEvent{Resource: ResourceClaim, ActionType: All}},
{Event: ClusterEvent{Resource: ResourceClass, ActionType: All}},
{Event: ClusterEvent{Resource: ResourceClaimParameters, ActionType: All}},
{Event: ClusterEvent{Resource: ResourceClassParameters, ActionType: All}},
}
}

View File

@ -644,6 +644,12 @@ func Test_buildQueueingHintMap(t *testing.T) {
{Resource: framework.ResourceClass, ActionType: framework.All}: {
{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.ResourceClaimParameters, ActionType: framework.All}: {
{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
},
{Resource: framework.ResourceClassParameters, ActionType: framework.All}: {
{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
},
},
},
{
@ -768,17 +774,19 @@ func Test_UnionedGVKs(t *testing.T) {
Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.All,
framework.Node: framework.All,
framework.CSINode: framework.All,
framework.CSIDriver: framework.All,
framework.CSIStorageCapacity: framework.All,
framework.PersistentVolume: framework.All,
framework.PersistentVolumeClaim: framework.All,
framework.StorageClass: framework.All,
framework.PodSchedulingContext: framework.All,
framework.ResourceClaim: framework.All,
framework.ResourceClass: framework.All,
framework.Pod: framework.All,
framework.Node: framework.All,
framework.CSINode: framework.All,
framework.CSIDriver: framework.All,
framework.CSIStorageCapacity: framework.All,
framework.PersistentVolume: framework.All,
framework.PersistentVolumeClaim: framework.All,
framework.StorageClass: framework.All,
framework.PodSchedulingContext: framework.All,
framework.ResourceClaim: framework.All,
framework.ResourceClass: framework.All,
framework.ResourceClaimParameters: framework.All,
framework.ResourceClassParameters: framework.All,
},
},
{

View File

@ -576,11 +576,13 @@ func ClusterRoles() []rbacv1.ClusterRole {
// Needed for dynamic resource allocation.
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
kubeSchedulerRules = append(kubeSchedulerRules,
rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceclaims", "resourceclasses").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("resourceclasses").RuleOrDie(),
rbacv1helpers.NewRule(ReadUpdate...).Groups(resourceGroup).Resources("resourceclaims").RuleOrDie(),
rbacv1helpers.NewRule(ReadUpdate...).Groups(resourceGroup).Resources("resourceclaims/status").RuleOrDie(),
rbacv1helpers.NewRule(ReadWrite...).Groups(resourceGroup).Resources("podschedulingcontexts").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("podschedulingcontexts/status").RuleOrDie(),
rbacv1helpers.NewRule(ReadUpdate...).Groups(legacyGroup).Resources("pods/finalizers").RuleOrDie(),
rbacv1helpers.NewRule(Read...).Groups(resourceGroup).Resources("noderesourceslices", "resourceclassparameters", "resourceclaimparameters").RuleOrDie(),
)
}
roles = append(roles, rbacv1.ClusterRole{

View File

@ -183,3 +183,17 @@ func CanBeReserved(claim *resourcev1alpha2.ResourceClaim) bool {
return claim.Status.Allocation.Shareable ||
len(claim.Status.ReservedFor) == 0
}
// IsAllocatedWithStructuredParameters checks whether the claim is allocated
// and the allocation was done with structured parameters.
func IsAllocatedWithStructuredParameters(claim *resourcev1alpha2.ResourceClaim) bool {
if claim.Status.Allocation == nil {
return false
}
for _, handle := range claim.Status.Allocation.ResourceHandles {
if handle.StructuredData != nil {
return true
}
}
return false
}