diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index fdd0e5533a5..9fce37fdabe 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -283,9 +283,9 @@ type dynamicResources struct { // claimAssumeCache enables temporarily storing a newer claim object // while the scheduler has allocated it and the corresponding object // update from the apiserver has not been processed by the claim - // informer callbacks. Claims get added here in Reserve and removed by + // informer callbacks. Claims get added here in PreBind and removed by // the informer callback (based on the "newer than" comparison in the - // assume cache) or when the API call in PreBind fails. + // assume cache). // // It uses cache.MetaNamespaceKeyFunc to generate object names, which // therefore are "/". @@ -304,7 +304,7 @@ type dynamicResources struct { // might have to be managed by the cluster autoscaler. claimAssumeCache volumebinding.AssumeCache - // inFlightAllocations is map from claim UUIDs to true for those claims + // inFlightAllocations is map from claim UUIDs to claim objects for those claims // for which allocation was triggered during a scheduling cycle and the // corresponding claim status update call in PreBind has not been done // yet. If another pod needs the claim, the pod is treated as "not @@ -943,7 +943,11 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl // problems for using the plugin in the Cluster Autoscaler. If // this step here turns out to be expensive, we may have to // maintain and update state more persistently. - resources, err := newResourceModel(logger, pl.resourceSliceLister, pl.claimAssumeCache) + // + // Claims are treated as "allocated" if they are in the assume cache + // or currently their allocation is in-flight. + resources, err := newResourceModel(logger, pl.resourceSliceLister, pl.claimAssumeCache, &pl.inFlightAllocations) + logger.V(5).Info("Resource usage", "resources", klog.Format(resources)) if err != nil { return nil, statusError(logger, err) } @@ -1382,14 +1386,11 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat } state.informationsForClaim[index].allocation = allocation state.informationsForClaim[index].allocationDriverName = driverName - pl.inFlightAllocations.Store(claim.UID, true) claim = claim.DeepCopy() claim.Status.DriverName = driverName claim.Status.Allocation = allocation - if err := pl.claimAssumeCache.Assume(claim); err != nil { - return statusError(logger, fmt.Errorf("update claim assume cache: %v", err)) - } - logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", allocation) + pl.inFlightAllocations.Store(claim.UID, claim) + logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", klog.Format(allocation)) } // When there is only one pending resource, we can go ahead with @@ -1557,7 +1558,7 @@ func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, ind allocationPatch := "" allocation := state.informationsForClaim[index].allocation - logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", allocation) + logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation)) // Do we need to store an allocation result from Reserve? if allocation != nil { @@ -1609,8 +1610,12 @@ func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, ind if allocationPatch != "" { // The scheduler was handling allocation. Now that has // completed, either successfully or with a failure. - if err != nil { - pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name) + if err == nil { + // This can fail, but only for reasons that are okay (concurrent delete or update). + // Shouldn't happen in this case. + if err := pl.claimAssumeCache.Assume(claim); err != nil { + logger.V(5).Info("Claim not stored in assume cache", "err", err) + } } pl.inFlightAllocations.Delete(claim.UID) } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources/namedresourcesmodel.go b/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources/namedresourcesmodel.go index 08ba8a32dd6..c65265c8380 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources/namedresourcesmodel.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/structured/namedresources/namedresourcesmodel.go @@ -27,13 +27,16 @@ import ( "k8s.io/dynamic-resource-allocation/structured/namedresources/cel" ) +// These types and fields are all exported to allow logging them with +// pretty-printed JSON. + type Model struct { - instances []instanceAllocation + Instances []InstanceAllocation } -type instanceAllocation struct { - allocated bool - instance *resourceapi.NamedResourcesInstance +type InstanceAllocation struct { + Allocated bool + Instance *resourceapi.NamedResourcesInstance } // AddResources must be called first to create entries for all existing @@ -44,7 +47,7 @@ func AddResources(m *Model, resources *resourceapi.NamedResourcesResources) { } for i := range resources.Instances { - m.instances = append(m.instances, instanceAllocation{instance: &resources.Instances[i]}) + m.Instances = append(m.Instances, InstanceAllocation{Instance: &resources.Instances[i]}) } } @@ -54,9 +57,9 @@ func AddAllocation(m *Model, result *resourceapi.NamedResourcesAllocationResult) if result == nil { return } - for i := range m.instances { - if m.instances[i].instance.Name == result.Name { - m.instances[i].allocated = true + for i := range m.Instances { + if m.Instances[i].Instance.Name == result.Name { + m.Instances[i].Allocated = true break } } @@ -103,23 +106,23 @@ func (c *Controller) Allocate(ctx context.Context, model Model) ([]*resourceapi. } results := make([]*resourceapi.NamedResourcesAllocationResult, len(c.requests)) for i := range c.requests { - results[i] = &resourceapi.NamedResourcesAllocationResult{Name: model.instances[indices[i]].instance.Name} + results[i] = &resourceapi.NamedResourcesAllocationResult{Name: model.Instances[indices[i]].Instance.Name} } return results, nil } func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) { // Shallow copy, we need to modify the allocated boolean. - instances := slices.Clone(model.instances) + instances := slices.Clone(model.Instances) indices := make([]int, 0, len(c.requests)) for _, request := range c.requests { for i, instance := range instances { - if instance.allocated { + if instance.Allocated { continue } if c.filter != nil { - okay, err := c.filter.Evaluate(ctx, instance.instance.Attributes) + okay, err := c.filter.Evaluate(ctx, instance.Instance.Attributes) if err != nil { return nil, fmt.Errorf("evaluate filter CEL expression: %w", err) } @@ -127,7 +130,7 @@ func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) { continue } } - okay, err := request.Evaluate(ctx, instance.instance.Attributes) + okay, err := request.Evaluate(ctx, instance.Instance.Attributes) if err != nil { return nil, fmt.Errorf("evaluate request CEL expression: %w", err) } @@ -140,7 +143,7 @@ func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) { // allocating one "large" instances for a "small" request may // make a following "large" request impossible to satisfy when // only "small" instances are left. - instances[i].allocated = true + instances[i].Allocated = true indices = append(indices, i) break } diff --git a/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go b/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go index 6ac1d263f16..44b9aee85b9 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/structuredparameters.go @@ -19,6 +19,7 @@ package dynamicresources import ( "context" "fmt" + "sync" v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" @@ -32,19 +33,19 @@ import ( // resources is a map "node name" -> "driver name" -> available and // allocated resources per structured parameter model. -type resources map[string]map[string]resourceModels +type resources map[string]map[string]ResourceModels -// resourceModels may have more than one entry because it is valid for a driver to +// ResourceModels may have more than one entry because it is valid for a driver to // use more than one structured parameter model. -type resourceModels struct { - namedresources namedresourcesmodel.Model +type ResourceModels struct { + NamedResources namedresourcesmodel.Model } // newResourceModel parses the available information about resources. Objects // with an unknown structured parameter model silently ignored. An error gets // logged later when parameters required for a pod depend on such an unknown // model. -func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2listers.ResourceSliceLister, claimAssumeCache volumebinding.AssumeCache) (resources, error) { +func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2listers.ResourceSliceLister, claimAssumeCache volumebinding.AssumeCache, inFlightAllocations *sync.Map) (resources, error) { model := make(resources) slices, err := resourceSliceLister.List(labels.Everything()) @@ -53,10 +54,10 @@ func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2li } for _, slice := range slices { if model[slice.NodeName] == nil { - model[slice.NodeName] = make(map[string]resourceModels) + model[slice.NodeName] = make(map[string]ResourceModels) } resource := model[slice.NodeName][slice.DriverName] - namedresourcesmodel.AddResources(&resource.namedresources, slice.NamedResources) + namedresourcesmodel.AddResources(&resource.NamedResources, slice.NamedResources) model[slice.NodeName][slice.DriverName] = resource } @@ -66,6 +67,11 @@ func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2li if !ok { return nil, fmt.Errorf("got unexpected object of type %T from claim assume cache", obj) } + if obj, ok := inFlightAllocations.Load(claim.UID); ok { + // If the allocation is in-flight, then we have to use the allocation + // from that claim. + claim = obj.(*resourcev1alpha2.ResourceClaim) + } if claim.Status.Allocation == nil { continue } @@ -75,12 +81,12 @@ func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2li continue } if model[structured.NodeName] == nil { - model[structured.NodeName] = make(map[string]resourceModels) + model[structured.NodeName] = make(map[string]ResourceModels) } resource := model[structured.NodeName][handle.DriverName] for _, result := range structured.Results { // Call AddAllocation for each known model. Each call itself needs to check for nil. - namedresourcesmodel.AddAllocation(&resource.namedresources, result.NamedResources) + namedresourcesmodel.AddAllocation(&resource.NamedResources, result.NamedResources) } } } @@ -159,7 +165,7 @@ type perDriverController struct { func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) { nodeResources := resources[nodeName] for driverName, perDriver := range c.namedresources { - okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].namedresources) + okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].NamedResources) if err != nil { // This is an error in the CEL expression which needs // to be fixed. Better fail very visibly instead of @@ -191,7 +197,7 @@ func (c claimController) allocate(ctx context.Context, nodeName string, resource for driverName, perDriver := range c.namedresources { // Must return one entry for each request. The entry may be nil. This way, // the result can be correlated with the per-request parameters. - results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].namedresources) + results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].NamedResources) if err != nil { return "", nil, fmt.Errorf("allocating via named resources structured model: %w", err) }