mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 20:17:41 +00:00
dra scheduler: consider in-flight allocation for resource calculation
Storing a modified claim with allocation and the original resource version in the assume cache was not reliable: if an update was received, it replaced the modified claim and the resource that was reserved for the claim might have been used for some other claim. To fix this, the in-flight claims are now stored in the map instead of just a boolean and the status stored there overrides whatever is in the assume cache. Logging got extended to diagnose this problem better. It started to occur in E2E tests after splitting the claim update so that first the finalizer is set and then the status, because setting the finalizer triggered an update.
This commit is contained in:
parent
2c6246c906
commit
251b3859b0
@ -283,9 +283,9 @@ type dynamicResources struct {
|
|||||||
// claimAssumeCache enables temporarily storing a newer claim object
|
// claimAssumeCache enables temporarily storing a newer claim object
|
||||||
// while the scheduler has allocated it and the corresponding object
|
// while the scheduler has allocated it and the corresponding object
|
||||||
// update from the apiserver has not been processed by the claim
|
// update from the apiserver has not been processed by the claim
|
||||||
// informer callbacks. Claims get added here in Reserve and removed by
|
// informer callbacks. Claims get added here in PreBind and removed by
|
||||||
// the informer callback (based on the "newer than" comparison in the
|
// the informer callback (based on the "newer than" comparison in the
|
||||||
// assume cache) or when the API call in PreBind fails.
|
// assume cache).
|
||||||
//
|
//
|
||||||
// It uses cache.MetaNamespaceKeyFunc to generate object names, which
|
// It uses cache.MetaNamespaceKeyFunc to generate object names, which
|
||||||
// therefore are "<namespace>/<name>".
|
// therefore are "<namespace>/<name>".
|
||||||
@ -304,7 +304,7 @@ type dynamicResources struct {
|
|||||||
// might have to be managed by the cluster autoscaler.
|
// might have to be managed by the cluster autoscaler.
|
||||||
claimAssumeCache volumebinding.AssumeCache
|
claimAssumeCache volumebinding.AssumeCache
|
||||||
|
|
||||||
// inFlightAllocations is map from claim UUIDs to true for those claims
|
// inFlightAllocations is map from claim UUIDs to claim objects for those claims
|
||||||
// for which allocation was triggered during a scheduling cycle and the
|
// for which allocation was triggered during a scheduling cycle and the
|
||||||
// corresponding claim status update call in PreBind has not been done
|
// corresponding claim status update call in PreBind has not been done
|
||||||
// yet. If another pod needs the claim, the pod is treated as "not
|
// yet. If another pod needs the claim, the pod is treated as "not
|
||||||
@ -943,7 +943,11 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
|
|||||||
// problems for using the plugin in the Cluster Autoscaler. If
|
// problems for using the plugin in the Cluster Autoscaler. If
|
||||||
// this step here turns out to be expensive, we may have to
|
// this step here turns out to be expensive, we may have to
|
||||||
// maintain and update state more persistently.
|
// maintain and update state more persistently.
|
||||||
resources, err := newResourceModel(logger, pl.resourceSliceLister, pl.claimAssumeCache)
|
//
|
||||||
|
// Claims are treated as "allocated" if they are in the assume cache
|
||||||
|
// or currently their allocation is in-flight.
|
||||||
|
resources, err := newResourceModel(logger, pl.resourceSliceLister, pl.claimAssumeCache, &pl.inFlightAllocations)
|
||||||
|
logger.V(5).Info("Resource usage", "resources", klog.Format(resources))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, statusError(logger, err)
|
return nil, statusError(logger, err)
|
||||||
}
|
}
|
||||||
@ -1382,14 +1386,11 @@ func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleStat
|
|||||||
}
|
}
|
||||||
state.informationsForClaim[index].allocation = allocation
|
state.informationsForClaim[index].allocation = allocation
|
||||||
state.informationsForClaim[index].allocationDriverName = driverName
|
state.informationsForClaim[index].allocationDriverName = driverName
|
||||||
pl.inFlightAllocations.Store(claim.UID, true)
|
|
||||||
claim = claim.DeepCopy()
|
claim = claim.DeepCopy()
|
||||||
claim.Status.DriverName = driverName
|
claim.Status.DriverName = driverName
|
||||||
claim.Status.Allocation = allocation
|
claim.Status.Allocation = allocation
|
||||||
if err := pl.claimAssumeCache.Assume(claim); err != nil {
|
pl.inFlightAllocations.Store(claim.UID, claim)
|
||||||
return statusError(logger, fmt.Errorf("update claim assume cache: %v", err))
|
logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", klog.Format(allocation))
|
||||||
}
|
|
||||||
logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", allocation)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// When there is only one pending resource, we can go ahead with
|
// When there is only one pending resource, we can go ahead with
|
||||||
@ -1557,7 +1558,7 @@ func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, ind
|
|||||||
allocationPatch := ""
|
allocationPatch := ""
|
||||||
|
|
||||||
allocation := state.informationsForClaim[index].allocation
|
allocation := state.informationsForClaim[index].allocation
|
||||||
logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", allocation)
|
logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))
|
||||||
|
|
||||||
// Do we need to store an allocation result from Reserve?
|
// Do we need to store an allocation result from Reserve?
|
||||||
if allocation != nil {
|
if allocation != nil {
|
||||||
@ -1609,8 +1610,12 @@ func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, ind
|
|||||||
if allocationPatch != "" {
|
if allocationPatch != "" {
|
||||||
// The scheduler was handling allocation. Now that has
|
// The scheduler was handling allocation. Now that has
|
||||||
// completed, either successfully or with a failure.
|
// completed, either successfully or with a failure.
|
||||||
if err != nil {
|
if err == nil {
|
||||||
pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name)
|
// This can fail, but only for reasons that are okay (concurrent delete or update).
|
||||||
|
// Shouldn't happen in this case.
|
||||||
|
if err := pl.claimAssumeCache.Assume(claim); err != nil {
|
||||||
|
logger.V(5).Info("Claim not stored in assume cache", "err", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pl.inFlightAllocations.Delete(claim.UID)
|
pl.inFlightAllocations.Delete(claim.UID)
|
||||||
}
|
}
|
||||||
|
@ -27,13 +27,16 @@ import (
|
|||||||
"k8s.io/dynamic-resource-allocation/structured/namedresources/cel"
|
"k8s.io/dynamic-resource-allocation/structured/namedresources/cel"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// These types and fields are all exported to allow logging them with
|
||||||
|
// pretty-printed JSON.
|
||||||
|
|
||||||
type Model struct {
|
type Model struct {
|
||||||
instances []instanceAllocation
|
Instances []InstanceAllocation
|
||||||
}
|
}
|
||||||
|
|
||||||
type instanceAllocation struct {
|
type InstanceAllocation struct {
|
||||||
allocated bool
|
Allocated bool
|
||||||
instance *resourceapi.NamedResourcesInstance
|
Instance *resourceapi.NamedResourcesInstance
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddResources must be called first to create entries for all existing
|
// AddResources must be called first to create entries for all existing
|
||||||
@ -44,7 +47,7 @@ func AddResources(m *Model, resources *resourceapi.NamedResourcesResources) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := range resources.Instances {
|
for i := range resources.Instances {
|
||||||
m.instances = append(m.instances, instanceAllocation{instance: &resources.Instances[i]})
|
m.Instances = append(m.Instances, InstanceAllocation{Instance: &resources.Instances[i]})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,9 +57,9 @@ func AddAllocation(m *Model, result *resourceapi.NamedResourcesAllocationResult)
|
|||||||
if result == nil {
|
if result == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for i := range m.instances {
|
for i := range m.Instances {
|
||||||
if m.instances[i].instance.Name == result.Name {
|
if m.Instances[i].Instance.Name == result.Name {
|
||||||
m.instances[i].allocated = true
|
m.Instances[i].Allocated = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -103,23 +106,23 @@ func (c *Controller) Allocate(ctx context.Context, model Model) ([]*resourceapi.
|
|||||||
}
|
}
|
||||||
results := make([]*resourceapi.NamedResourcesAllocationResult, len(c.requests))
|
results := make([]*resourceapi.NamedResourcesAllocationResult, len(c.requests))
|
||||||
for i := range c.requests {
|
for i := range c.requests {
|
||||||
results[i] = &resourceapi.NamedResourcesAllocationResult{Name: model.instances[indices[i]].instance.Name}
|
results[i] = &resourceapi.NamedResourcesAllocationResult{Name: model.Instances[indices[i]].Instance.Name}
|
||||||
}
|
}
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) {
|
func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) {
|
||||||
// Shallow copy, we need to modify the allocated boolean.
|
// Shallow copy, we need to modify the allocated boolean.
|
||||||
instances := slices.Clone(model.instances)
|
instances := slices.Clone(model.Instances)
|
||||||
indices := make([]int, 0, len(c.requests))
|
indices := make([]int, 0, len(c.requests))
|
||||||
|
|
||||||
for _, request := range c.requests {
|
for _, request := range c.requests {
|
||||||
for i, instance := range instances {
|
for i, instance := range instances {
|
||||||
if instance.allocated {
|
if instance.Allocated {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if c.filter != nil {
|
if c.filter != nil {
|
||||||
okay, err := c.filter.Evaluate(ctx, instance.instance.Attributes)
|
okay, err := c.filter.Evaluate(ctx, instance.Instance.Attributes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("evaluate filter CEL expression: %w", err)
|
return nil, fmt.Errorf("evaluate filter CEL expression: %w", err)
|
||||||
}
|
}
|
||||||
@ -127,7 +130,7 @@ func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
okay, err := request.Evaluate(ctx, instance.instance.Attributes)
|
okay, err := request.Evaluate(ctx, instance.Instance.Attributes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("evaluate request CEL expression: %w", err)
|
return nil, fmt.Errorf("evaluate request CEL expression: %w", err)
|
||||||
}
|
}
|
||||||
@ -140,7 +143,7 @@ func (c *Controller) allocate(ctx context.Context, model Model) ([]int, error) {
|
|||||||
// allocating one "large" instances for a "small" request may
|
// allocating one "large" instances for a "small" request may
|
||||||
// make a following "large" request impossible to satisfy when
|
// make a following "large" request impossible to satisfy when
|
||||||
// only "small" instances are left.
|
// only "small" instances are left.
|
||||||
instances[i].allocated = true
|
instances[i].Allocated = true
|
||||||
indices = append(indices, i)
|
indices = append(indices, i)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@ package dynamicresources
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
||||||
@ -32,19 +33,19 @@ import (
|
|||||||
|
|
||||||
// resources is a map "node name" -> "driver name" -> available and
|
// resources is a map "node name" -> "driver name" -> available and
|
||||||
// allocated resources per structured parameter model.
|
// allocated resources per structured parameter model.
|
||||||
type resources map[string]map[string]resourceModels
|
type resources map[string]map[string]ResourceModels
|
||||||
|
|
||||||
// resourceModels may have more than one entry because it is valid for a driver to
|
// ResourceModels may have more than one entry because it is valid for a driver to
|
||||||
// use more than one structured parameter model.
|
// use more than one structured parameter model.
|
||||||
type resourceModels struct {
|
type ResourceModels struct {
|
||||||
namedresources namedresourcesmodel.Model
|
NamedResources namedresourcesmodel.Model
|
||||||
}
|
}
|
||||||
|
|
||||||
// newResourceModel parses the available information about resources. Objects
|
// newResourceModel parses the available information about resources. Objects
|
||||||
// with an unknown structured parameter model silently ignored. An error gets
|
// with an unknown structured parameter model silently ignored. An error gets
|
||||||
// logged later when parameters required for a pod depend on such an unknown
|
// logged later when parameters required for a pod depend on such an unknown
|
||||||
// model.
|
// model.
|
||||||
func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2listers.ResourceSliceLister, claimAssumeCache volumebinding.AssumeCache) (resources, error) {
|
func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2listers.ResourceSliceLister, claimAssumeCache volumebinding.AssumeCache, inFlightAllocations *sync.Map) (resources, error) {
|
||||||
model := make(resources)
|
model := make(resources)
|
||||||
|
|
||||||
slices, err := resourceSliceLister.List(labels.Everything())
|
slices, err := resourceSliceLister.List(labels.Everything())
|
||||||
@ -53,10 +54,10 @@ func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2li
|
|||||||
}
|
}
|
||||||
for _, slice := range slices {
|
for _, slice := range slices {
|
||||||
if model[slice.NodeName] == nil {
|
if model[slice.NodeName] == nil {
|
||||||
model[slice.NodeName] = make(map[string]resourceModels)
|
model[slice.NodeName] = make(map[string]ResourceModels)
|
||||||
}
|
}
|
||||||
resource := model[slice.NodeName][slice.DriverName]
|
resource := model[slice.NodeName][slice.DriverName]
|
||||||
namedresourcesmodel.AddResources(&resource.namedresources, slice.NamedResources)
|
namedresourcesmodel.AddResources(&resource.NamedResources, slice.NamedResources)
|
||||||
model[slice.NodeName][slice.DriverName] = resource
|
model[slice.NodeName][slice.DriverName] = resource
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,6 +67,11 @@ func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2li
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("got unexpected object of type %T from claim assume cache", obj)
|
return nil, fmt.Errorf("got unexpected object of type %T from claim assume cache", obj)
|
||||||
}
|
}
|
||||||
|
if obj, ok := inFlightAllocations.Load(claim.UID); ok {
|
||||||
|
// If the allocation is in-flight, then we have to use the allocation
|
||||||
|
// from that claim.
|
||||||
|
claim = obj.(*resourcev1alpha2.ResourceClaim)
|
||||||
|
}
|
||||||
if claim.Status.Allocation == nil {
|
if claim.Status.Allocation == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -75,12 +81,12 @@ func newResourceModel(logger klog.Logger, resourceSliceLister resourcev1alpha2li
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if model[structured.NodeName] == nil {
|
if model[structured.NodeName] == nil {
|
||||||
model[structured.NodeName] = make(map[string]resourceModels)
|
model[structured.NodeName] = make(map[string]ResourceModels)
|
||||||
}
|
}
|
||||||
resource := model[structured.NodeName][handle.DriverName]
|
resource := model[structured.NodeName][handle.DriverName]
|
||||||
for _, result := range structured.Results {
|
for _, result := range structured.Results {
|
||||||
// Call AddAllocation for each known model. Each call itself needs to check for nil.
|
// Call AddAllocation for each known model. Each call itself needs to check for nil.
|
||||||
namedresourcesmodel.AddAllocation(&resource.namedresources, result.NamedResources)
|
namedresourcesmodel.AddAllocation(&resource.NamedResources, result.NamedResources)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -159,7 +165,7 @@ type perDriverController struct {
|
|||||||
func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) {
|
func (c claimController) nodeIsSuitable(ctx context.Context, nodeName string, resources resources) (bool, error) {
|
||||||
nodeResources := resources[nodeName]
|
nodeResources := resources[nodeName]
|
||||||
for driverName, perDriver := range c.namedresources {
|
for driverName, perDriver := range c.namedresources {
|
||||||
okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].namedresources)
|
okay, err := perDriver.controller.NodeIsSuitable(ctx, nodeResources[driverName].NamedResources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// This is an error in the CEL expression which needs
|
// This is an error in the CEL expression which needs
|
||||||
// to be fixed. Better fail very visibly instead of
|
// to be fixed. Better fail very visibly instead of
|
||||||
@ -191,7 +197,7 @@ func (c claimController) allocate(ctx context.Context, nodeName string, resource
|
|||||||
for driverName, perDriver := range c.namedresources {
|
for driverName, perDriver := range c.namedresources {
|
||||||
// Must return one entry for each request. The entry may be nil. This way,
|
// Must return one entry for each request. The entry may be nil. This way,
|
||||||
// the result can be correlated with the per-request parameters.
|
// the result can be correlated with the per-request parameters.
|
||||||
results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].namedresources)
|
results, err := perDriver.controller.Allocate(ctx, nodeResources[driverName].NamedResources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, fmt.Errorf("allocating via named resources structured model: %w", err)
|
return "", nil, fmt.Errorf("allocating via named resources structured model: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user