Add AddedAffinity to the NodeAffinityArgs

And use it in Filter and Score.

Change-Id: I173d8f2d5578762e9873181d5b44ea30b6dbbbc2
This commit is contained in:
Aldo Culquicondor
2020-10-23 13:21:10 -04:00
parent bf5382a53e
commit 3ce145787a
18 changed files with 542 additions and 55 deletions

View File

@@ -23,13 +23,17 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
)
// NodeAffinity is a plugin that checks if a pod node selector matches the node label.
type NodeAffinity struct {
handle framework.Handle
handle framework.Handle
addedNodeSelector *nodeaffinity.NodeSelector
addedPrefSchedTerms *nodeaffinity.PreferredSchedulingTerms
}
var _ framework.FilterPlugin = &NodeAffinity{}
@@ -39,8 +43,11 @@ const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = "NodeAffinity"
// ErrReason for node affinity/selector not matching.
ErrReason = "node(s) didn't match node selector"
// ErrReasonPod is the reason for Pod's node affinity/selector not matching.
ErrReasonPod = "node(s) didn't match Pod's node affinity"
// errReasonEnforced is the reason for added node affinity not matching.
errReasonEnforced = "node(s) didn't match scheduler-enforced node affinity"
)
// Name returns name of the plugin. It is used in logs, etc.
@@ -48,19 +55,25 @@ func (pl *NodeAffinity) Name() string {
return Name
}
// Filter invoked at the filter extension point.
// Filter checks if the Node matches the Pod .spec.affinity.nodeAffinity and
// the plugin's added affinity.
func (pl *NodeAffinity) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
if pl.addedNodeSelector != nil && !pl.addedNodeSelector.Match(node) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonEnforced)
}
if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod)
}
return nil
}
// Score invoked at the Score extension point.
// Score returns the sum of the weights of the terms that match the Node.
// Terms came from the Pod .spec.affinity.nodeAffinity and from the plugin's
// default affinity.
func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
@@ -75,6 +88,9 @@ func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState,
affinity := pod.Spec.Affinity
var count int64
if pl.addedPrefSchedTerms != nil {
count += pl.addedPrefSchedTerms.Score(node)
}
// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
// empty PreferredSchedulingTerm matches all objects.
@@ -101,6 +117,36 @@ func (pl *NodeAffinity) ScoreExtensions() framework.ScoreExtensions {
}
// New initializes a new plugin and returns it.
func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
return &NodeAffinity{handle: h}, nil
func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
args, err := getArgs(plArgs)
if err != nil {
return nil, err
}
pl := &NodeAffinity{
handle: h,
}
if args.AddedAffinity != nil {
if ns := args.AddedAffinity.RequiredDuringSchedulingIgnoredDuringExecution; ns != nil {
pl.addedNodeSelector, err = nodeaffinity.NewNodeSelector(ns)
if err != nil {
return nil, fmt.Errorf("parsing addedAffinity.requiredDuringSchedulingIgnoredDuringExecution: %w", err)
}
}
// TODO: parse requiredDuringSchedulingRequiredDuringExecution when it gets added to the API.
if terms := args.AddedAffinity.PreferredDuringSchedulingIgnoredDuringExecution; len(terms) != 0 {
pl.addedPrefSchedTerms, err = nodeaffinity.NewPreferredSchedulingTerms(terms)
if err != nil {
return nil, fmt.Errorf("parsing addedAffinity.preferredDuringSchedulingIgnoredDuringExecution: %w", err)
}
}
}
return pl, nil
}
func getArgs(obj runtime.Object) (config.NodeAffinityArgs, error) {
ptr, ok := obj.(*config.NodeAffinityArgs)
if !ok {
return config.NodeAffinityArgs{}, fmt.Errorf("args are not of type NodeAffinityArgs, got %T", obj)
}
return *ptr, validation.ValidateNodeAffinityArgs(ptr)
}