parse node selector in prefilter

This commit is contained in:
AxeZhan 2023-08-13 17:34:16 +08:00 committed by AxeZhan
parent 6e0cb243d5
commit 47fec59a31
2 changed files with 48 additions and 39 deletions

View File

@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
@ -64,11 +63,6 @@ type stateData struct {
// Empty if the Pod has no claims.
claims []*resourcev1alpha2.ResourceClaim
// The AvailableOnNodes node filters of the claims converted from the
// v1 API to nodeaffinity.NodeSelector by PreFilter for repeated
// evaluation in Filter. Nil for claims which don't have it.
availableOnNodes []*nodeaffinity.NodeSelector
// The indices of all claims that:
// - are allocated
// - use delayed allocation
@ -91,6 +85,19 @@ type stateData struct {
podSchedulingDirty bool
mutex sync.Mutex
informationsForClaim []informationForClaim
}
type informationForClaim struct {
// The availableOnNode node filter of the claim converted from the
// v1 API to nodeaffinity.NodeSelector by PreFilter for repeated
// evaluation in Filter. Nil for claim which don't have it.
availableOnNode *nodeaffinity.NodeSelector
// The status of the claim got from the
// schedulingCtx by PreFilter for repeated
// evaluation in Filter. Nil for claim which don't have it.
status *resourcev1alpha2.ResourceClaimSchedulingStatus
}
func (d *stateData) Clone() framework.StateData {
@ -555,7 +562,7 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
return nil, framework.NewStatus(framework.Skip)
}
s.availableOnNodes = make([]*nodeaffinity.NodeSelector, len(claims))
s.informationsForClaim = make([]informationForClaim, len(claims))
for index, claim := range claims {
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate &&
claim.Status.Allocation == nil {
@ -578,7 +585,32 @@ func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
if err != nil {
return nil, statusError(logger, err)
}
s.availableOnNodes[index] = nodeSelector
s.informationsForClaim[index].availableOnNode = nodeSelector
}
if claim.Status.Allocation == nil &&
claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer {
// The ResourceClass might have a node filter. This is
// useful for trimming the initial set of potential
// nodes before we ask the driver(s) for information
// about the specific pod.
class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
if err != nil {
// If the class does not exist, then allocation cannot proceed.
return nil, statusError(logger, fmt.Errorf("look up resource class: %v", err))
}
if class.SuitableNodes != nil {
selector, err := nodeaffinity.NewNodeSelector(class.SuitableNodes)
if err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim[index].availableOnNode = selector
}
// Now we need information from drivers.
schedulingCtx, err := s.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
if err != nil {
return nil, statusError(logger, err)
}
s.informationsForClaim[index].status = statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name)
}
}
@ -633,7 +665,7 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
logger.V(10).Info("filtering based on resource claims of the pod", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
switch {
case claim.Status.Allocation != nil:
if nodeSelector := state.availableOnNodes[index]; nodeSelector != nil {
if nodeSelector := state.informationsForClaim[index].availableOnNode; nodeSelector != nil {
if !nodeSelector.Match(node) {
logger.V(5).Info("AvailableOnNodes does not match", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
unavailableClaims = append(unavailableClaims, index)
@ -643,33 +675,12 @@ func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState
// We shouldn't get here. PreFilter already checked this.
return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer:
// The ResourceClass might have a node filter. This is
// useful for trimming the initial set of potential
// nodes before we ask the driver(s) for information
// about the specific pod.
class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
if err != nil {
// If the class does not exist, then allocation cannot proceed.
return statusError(logger, fmt.Errorf("look up resource class: %v", err))
}
if class.SuitableNodes != nil {
// TODO (#113700): parse class.SuitableNodes once in PreFilter, reuse result.
matches, err := corev1helpers.MatchNodeSelectorTerms(node, class.SuitableNodes)
if err != nil {
return statusError(logger, fmt.Errorf("potential node filter: %v", err))
}
if !matches {
return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclass", klog.KObj(class))
if selector := state.informationsForClaim[index].availableOnNode; selector != nil {
if matches := selector.Match(node); !matches {
return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclassName", claim.Spec.ResourceClassName)
}
}
// Now we need information from drivers.
schedulingCtx, err := state.initializePodSchedulingContexts(ctx, pod, pl.podSchedulingContextLister)
if err != nil {
return statusError(logger, err)
}
status := statusForClaim(schedulingCtx, pod.Spec.ResourceClaims[index].Name)
if status != nil {
if status := state.informationsForClaim[index].status; status != nil {
for _, unsuitableNode := range status.UnsuitableNodes {
if node.Name == unsuitableNode {
return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes)

View File

@ -330,13 +330,11 @@ func TestPlugin(t *testing.T) {
pod: podWithClaimName,
claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
want: want{
filter: perNodeResult{
workerNode.Name: {
status: framework.AsStatus(fmt.Errorf(`look up resource class: resourceclass.resource.k8s.io "%s" not found`, className)),
},
prefilter: result{
status: framework.AsStatus(fmt.Errorf(`look up resource class: resourceclass.resource.k8s.io "%s" not found`, className)),
},
postfilter: result{
status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
},
},
},