mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 04:06:03 +00:00
Merge pull request #86638 from danielqsj/ServiceAntiAffinityPriority
Move ServiceAntiAffinityPriority to score plugin
This commit is contained in:
commit
b52cdca17d
@ -56,9 +56,8 @@ func NewMetadataFactory(
|
||||
|
||||
// priorityMetadata is a type that is passed as metadata for priority functions
|
||||
type priorityMetadata struct {
|
||||
podSelector labels.Selector
|
||||
podFirstServiceSelector labels.Selector
|
||||
podTopologySpreadMap *podTopologySpreadMap
|
||||
podSelector labels.Selector
|
||||
podTopologySpreadMap *podTopologySpreadMap
|
||||
}
|
||||
|
||||
// PriorityMetadata is a MetadataProducer. Node info can be nil.
|
||||
@ -83,20 +82,11 @@ func (pmf *MetadataFactory) PriorityMetadata(
|
||||
return nil
|
||||
}
|
||||
return &priorityMetadata{
|
||||
podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
|
||||
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
|
||||
podTopologySpreadMap: tpSpreadMap,
|
||||
podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
|
||||
podTopologySpreadMap: tpSpreadMap,
|
||||
}
|
||||
}
|
||||
|
||||
// getFirstServiceSelector returns one selector of services the given pod.
|
||||
func getFirstServiceSelector(pod *v1.Pod, sl corelisters.ServiceLister) (firstServiceSelector labels.Selector) {
|
||||
if services, err := schedulerlisters.GetPodServices(sl, pod); err == nil && len(services) > 0 {
|
||||
return labels.SelectorFromSet(services[0].Spec.Selector)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getSelector returns a selector for the services, RCs, RSs, and SSs matching the given pod.
|
||||
func getSelector(pod *v1.Pod, sl corelisters.ServiceLister, cl corelisters.ReplicationControllerLister, rsl appslisters.ReplicaSetLister, ssl appslisters.StatefulSetLister) labels.Selector {
|
||||
labelSet := make(labels.Set)
|
||||
|
@ -154,23 +154,6 @@ func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interfa
|
||||
return nil
|
||||
}
|
||||
|
||||
// ServiceAntiAffinity contains information to calculate service anti-affinity priority.
|
||||
type ServiceAntiAffinity struct {
|
||||
podLister schedulerlisters.PodLister
|
||||
serviceLister corelisters.ServiceLister
|
||||
labels []string
|
||||
}
|
||||
|
||||
// NewServiceAntiAffinityPriority creates a ServiceAntiAffinity.
|
||||
func NewServiceAntiAffinityPriority(podLister schedulerlisters.PodLister, serviceLister corelisters.ServiceLister, labels []string) (PriorityMapFunction, PriorityReduceFunction) {
|
||||
antiAffinity := &ServiceAntiAffinity{
|
||||
podLister: podLister,
|
||||
serviceLister: serviceLister,
|
||||
labels: labels,
|
||||
}
|
||||
return antiAffinity.CalculateAntiAffinityPriorityMap, antiAffinity.CalculateAntiAffinityPriorityReduce
|
||||
}
|
||||
|
||||
// countMatchingPods counts pods based on namespace and matching all selectors
|
||||
func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *schedulernodeinfo.NodeInfo) int {
|
||||
if nodeInfo.Pods() == nil || len(nodeInfo.Pods()) == 0 || selector.Empty() {
|
||||
@ -188,103 +171,3 @@ func countMatchingPods(namespace string, selector labels.Selector, nodeInfo *sch
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// CalculateAntiAffinityPriorityMap spreads pods by minimizing the number of pods belonging to the same service
|
||||
// on given machine
|
||||
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
|
||||
var firstServiceSelector labels.Selector
|
||||
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return framework.NodeScore{}, fmt.Errorf("node not found")
|
||||
}
|
||||
priorityMeta, ok := meta.(*priorityMetadata)
|
||||
if ok {
|
||||
firstServiceSelector = priorityMeta.podFirstServiceSelector
|
||||
} else {
|
||||
firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
|
||||
}
|
||||
// Pods matched namespace,selector on current node.
|
||||
var selector labels.Selector
|
||||
if firstServiceSelector != nil {
|
||||
selector = firstServiceSelector
|
||||
} else {
|
||||
selector = labels.NewSelector()
|
||||
}
|
||||
score := countMatchingPods(pod.Namespace, selector, nodeInfo)
|
||||
|
||||
return framework.NodeScore{
|
||||
Name: node.Name,
|
||||
Score: int64(score),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CalculateAntiAffinityPriorityReduce computes each node score with the same value for a particular label.
|
||||
// The label to be considered is provided to the struct (ServiceAntiAffinity).
|
||||
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityReduce(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error {
|
||||
reduceResult := make([]float64, len(result))
|
||||
for _, label := range s.labels {
|
||||
if err := s.updateNodeScoresForLabel(sharedLister, result, reduceResult, label); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Update the result after all labels have been evaluated.
|
||||
for i, nodeScore := range reduceResult {
|
||||
result[i].Score = int64(nodeScore)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateNodeScoresForLabel updates the node scores for a single label. Note it does not update the
|
||||
// original result from the map phase directly, but instead updates the reduceResult, which is used
|
||||
// to update the original result finally. This makes sure that each call to updateNodeScoresForLabel
|
||||
// receives the same mapResult to work with.
|
||||
// Why are doing this? This is a workaround for the migration from priorities to score plugins.
|
||||
// Historically the priority is designed to handle only one label, and multiple priorities are configured
|
||||
// to work with multiple labels. Using multiple plugins is not allowed in the new framework. Therefore
|
||||
// we need to modify the old priority to be able to handle multiple labels so that it can be mapped
|
||||
// to a single plugin. This will be deprecated soon.
|
||||
func (s *ServiceAntiAffinity) updateNodeScoresForLabel(sharedLister schedulerlisters.SharedLister, mapResult framework.NodeScoreList, reduceResult []float64, label string) error {
|
||||
var numServicePods int64
|
||||
var labelValue string
|
||||
podCounts := map[string]int64{}
|
||||
labelNodesStatus := map[string]string{}
|
||||
maxPriorityFloat64 := float64(framework.MaxNodeScore)
|
||||
|
||||
for _, hostPriority := range mapResult {
|
||||
numServicePods += hostPriority.Score
|
||||
nodeInfo, err := sharedLister.NodeInfos().Get(hostPriority.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !labels.Set(nodeInfo.Node().Labels).Has(label) {
|
||||
continue
|
||||
}
|
||||
|
||||
labelValue = labels.Set(nodeInfo.Node().Labels).Get(label)
|
||||
labelNodesStatus[hostPriority.Name] = labelValue
|
||||
podCounts[labelValue] += hostPriority.Score
|
||||
}
|
||||
|
||||
//score int - scale of 0-maxPriority
|
||||
// 0 being the lowest priority and maxPriority being the highest
|
||||
for i, hostPriority := range mapResult {
|
||||
labelValue, ok := labelNodesStatus[hostPriority.Name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// initializing to the default/max node score of maxPriority
|
||||
fScore := maxPriorityFloat64
|
||||
if numServicePods > 0 {
|
||||
fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[labelValue]) / float64(numServicePods))
|
||||
}
|
||||
// The score of current label only accounts for 1/len(s.labels) of the total score.
|
||||
// The policy API definition only allows a single label to be configured, associated with a weight.
|
||||
// This is compensated by the fact that the total weight is the sum of all weights configured
|
||||
// in each policy config.
|
||||
reduceResult[i] += fScore / float64(len(s.labels))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -607,196 +607,6 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestZoneSpreadPriority(t *testing.T) {
|
||||
labels1 := map[string]string{
|
||||
"foo": "bar",
|
||||
"baz": "blah",
|
||||
}
|
||||
labels2 := map[string]string{
|
||||
"bar": "foo",
|
||||
"baz": "blah",
|
||||
}
|
||||
zone1 := map[string]string{
|
||||
"zone": "zone1",
|
||||
}
|
||||
zone2 := map[string]string{
|
||||
"zone": "zone2",
|
||||
}
|
||||
nozone := map[string]string{
|
||||
"name": "value",
|
||||
}
|
||||
zone0Spec := v1.PodSpec{
|
||||
NodeName: "machine01",
|
||||
}
|
||||
zone1Spec := v1.PodSpec{
|
||||
NodeName: "machine11",
|
||||
}
|
||||
zone2Spec := v1.PodSpec{
|
||||
NodeName: "machine21",
|
||||
}
|
||||
labeledNodes := map[string]map[string]string{
|
||||
"machine01": nozone, "machine02": nozone,
|
||||
"machine11": zone1, "machine12": zone1,
|
||||
"machine21": zone2, "machine22": zone2,
|
||||
}
|
||||
tests := []struct {
|
||||
pod *v1.Pod
|
||||
pods []*v1.Pod
|
||||
nodes map[string]map[string]string
|
||||
services []*v1.Service
|
||||
expectedList framework.NodeScoreList
|
||||
name string
|
||||
}{
|
||||
{
|
||||
pod: new(v1.Pod),
|
||||
nodes: labeledNodes,
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: framework.MaxNodeScore}, {Name: "machine12", Score: framework.MaxNodeScore},
|
||||
{Name: "machine21", Score: framework.MaxNodeScore}, {Name: "machine22", Score: framework.MaxNodeScore},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "nothing scheduled",
|
||||
},
|
||||
{
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
pods: []*v1.Pod{{Spec: zone1Spec}},
|
||||
nodes: labeledNodes,
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: framework.MaxNodeScore}, {Name: "machine12", Score: framework.MaxNodeScore},
|
||||
{Name: "machine21", Score: framework.MaxNodeScore}, {Name: "machine22", Score: framework.MaxNodeScore},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "no services",
|
||||
},
|
||||
{
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
pods: []*v1.Pod{{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}}},
|
||||
nodes: labeledNodes,
|
||||
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: framework.MaxNodeScore}, {Name: "machine12", Score: framework.MaxNodeScore},
|
||||
{Name: "machine21", Score: framework.MaxNodeScore}, {Name: "machine22", Score: framework.MaxNodeScore},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "different services",
|
||||
},
|
||||
{
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
pods: []*v1.Pod{
|
||||
{Spec: zone0Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: framework.MaxNodeScore}, {Name: "machine12", Score: framework.MaxNodeScore},
|
||||
{Name: "machine21", Score: 0}, {Name: "machine22", Score: 0},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "three pods, one service pod",
|
||||
},
|
||||
{
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
pods: []*v1.Pod{
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: 50}, {Name: "machine12", Score: 50},
|
||||
{Name: "machine21", Score: 50}, {Name: "machine22", Score: 50},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "three pods, two service pods on different machines",
|
||||
},
|
||||
{
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: metav1.NamespaceDefault}},
|
||||
pods: []*v1.Pod{
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: metav1.NamespaceDefault}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1, Namespace: "ns1"}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}, ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault}}},
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: 0}, {Name: "machine12", Score: 0},
|
||||
{Name: "machine21", Score: framework.MaxNodeScore}, {Name: "machine22", Score: framework.MaxNodeScore},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "three service label match pods in different namespaces",
|
||||
},
|
||||
{
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
pods: []*v1.Pod{
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: 66}, {Name: "machine12", Score: 66},
|
||||
{Name: "machine21", Score: 33}, {Name: "machine22", Score: 33},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "four pods, three service pods",
|
||||
},
|
||||
{
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
pods: []*v1.Pod{
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels2}},
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: 33}, {Name: "machine12", Score: 33},
|
||||
{Name: "machine21", Score: 66}, {Name: "machine22", Score: 66},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "service with partial pod label matches",
|
||||
},
|
||||
{
|
||||
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
pods: []*v1.Pod{
|
||||
{Spec: zone0Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone1Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
{Spec: zone2Spec, ObjectMeta: metav1.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []framework.NodeScore{{Name: "machine11", Score: 75}, {Name: "machine12", Score: 75},
|
||||
{Name: "machine21", Score: 50}, {Name: "machine22", Score: 50},
|
||||
{Name: "machine01", Score: 0}, {Name: "machine02", Score: 0}},
|
||||
name: "service pod on non-zoned node",
|
||||
},
|
||||
}
|
||||
// these local variables just make sure controllerLister\replicaSetLister\statefulSetLister not nil
|
||||
// when construct metaDataProducer
|
||||
sss := []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}
|
||||
rcs := []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}
|
||||
rss := []*apps.ReplicaSet{{Spec: apps.ReplicaSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
nodes := makeLabeledNodeList(labeledNodes)
|
||||
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
|
||||
zoneSpread := ServiceAntiAffinity{podLister: snapshot.Pods(), serviceLister: fakelisters.ServiceLister(test.services), labels: []string{"zone"}}
|
||||
|
||||
metaDataProducer := NewMetadataFactory(
|
||||
fakelisters.ServiceLister(test.services),
|
||||
fakelisters.ControllerLister(rcs),
|
||||
fakelisters.ReplicaSetLister(rss),
|
||||
fakelisters.StatefulSetLister(sss),
|
||||
1,
|
||||
)
|
||||
metaData := metaDataProducer(test.pod, nodes, snapshot)
|
||||
list, err := runMapReducePriority(zoneSpread.CalculateAntiAffinityPriorityMap, zoneSpread.CalculateAntiAffinityPriorityReduce, metaData, test.pod, snapshot, makeLabeledNodeList(test.nodes))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// sort the two lists to avoid failures on account of different ordering
|
||||
sortNodeScoreList(test.expectedList)
|
||||
sortNodeScoreList(list)
|
||||
if !reflect.DeepEqual(test.expectedList, list) {
|
||||
t.Errorf("expected %#v, got %#v", test.expectedList, list)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func makeLabeledNodeList(nodeMap map[string]map[string]string) []*v1.Node {
|
||||
nodes := make([]*v1.Node, 0, len(nodeMap))
|
||||
for nodeName, labels := range nodeMap {
|
||||
|
@ -7,7 +7,6 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/algorithm/priorities:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/migration:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/listers:go_default_library",
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
|
||||
@ -80,27 +79,20 @@ func New(plArgs *runtime.Unknown, handle framework.FrameworkHandle) (framework.P
|
||||
return nil, err
|
||||
}
|
||||
informerFactory := handle.SharedInformerFactory()
|
||||
podLister := handle.SnapshotSharedLister().Pods()
|
||||
serviceLister := informerFactory.Core().V1().Services().Lister()
|
||||
|
||||
priorityMapFunction, priorityReduceFunction := priorities.NewServiceAntiAffinityPriority(podLister, serviceLister, args.AntiAffinityLabelsPreference)
|
||||
|
||||
return &ServiceAffinity{
|
||||
sharedLister: handle.SnapshotSharedLister(),
|
||||
serviceLister: serviceLister,
|
||||
priorityMapFunction: priorityMapFunction,
|
||||
priorityReduceFunction: priorityReduceFunction,
|
||||
args: args,
|
||||
sharedLister: handle.SnapshotSharedLister(),
|
||||
serviceLister: serviceLister,
|
||||
args: args,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ServiceAffinity is a plugin that checks service affinity.
|
||||
type ServiceAffinity struct {
|
||||
args Args
|
||||
sharedLister schedulerlisters.SharedLister
|
||||
serviceLister corelisters.ServiceLister
|
||||
priorityMapFunction priorities.PriorityMapFunction
|
||||
priorityReduceFunction priorities.PriorityReduceFunction
|
||||
args Args
|
||||
sharedLister schedulerlisters.SharedLister
|
||||
serviceLister corelisters.ServiceLister
|
||||
}
|
||||
|
||||
var _ framework.PreFilterPlugin = &ServiceAffinity{}
|
||||
@ -293,16 +285,105 @@ func (pl *ServiceAffinity) Score(ctx context.Context, state *framework.CycleStat
|
||||
if err != nil {
|
||||
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
|
||||
}
|
||||
meta := migration.PriorityMetadata(state)
|
||||
s, err := pl.priorityMapFunction(pod, meta, nodeInfo)
|
||||
return s.Score, migration.ErrorToFrameworkStatus(err)
|
||||
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))
|
||||
}
|
||||
|
||||
// Pods matched namespace,selector on current node.
|
||||
var selector labels.Selector
|
||||
if services, err := schedulerlisters.GetPodServices(pl.serviceLister, pod); err == nil && len(services) > 0 {
|
||||
selector = labels.SelectorFromSet(services[0].Spec.Selector)
|
||||
} else {
|
||||
selector = labels.NewSelector()
|
||||
}
|
||||
|
||||
if len(nodeInfo.Pods()) == 0 || selector.Empty() {
|
||||
return 0, nil
|
||||
}
|
||||
var score int64
|
||||
for _, existingPod := range nodeInfo.Pods() {
|
||||
// Ignore pods being deleted for spreading purposes
|
||||
// Similar to how it is done for SelectorSpreadPriority
|
||||
if pod.Namespace == existingPod.Namespace && existingPod.DeletionTimestamp == nil {
|
||||
if selector.Matches(labels.Set(existingPod.Labels)) {
|
||||
score++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return score, nil
|
||||
}
|
||||
|
||||
// NormalizeScore invoked after scoring all nodes.
|
||||
func (pl *ServiceAffinity) NormalizeScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
|
||||
// Note that priorityReduceFunction doesn't use priority metadata, hence passing nil here.
|
||||
err := pl.priorityReduceFunction(pod, nil, pl.sharedLister, scores)
|
||||
return migration.ErrorToFrameworkStatus(err)
|
||||
reduceResult := make([]float64, len(scores))
|
||||
for _, label := range pl.args.AntiAffinityLabelsPreference {
|
||||
if err := pl.updateNodeScoresForLabel(pl.sharedLister, scores, reduceResult, label); err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Update the result after all labels have been evaluated.
|
||||
for i, nodeScore := range reduceResult {
|
||||
scores[i].Score = int64(nodeScore)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateNodeScoresForLabel updates the node scores for a single label. Note it does not update the
|
||||
// original result from the map phase directly, but instead updates the reduceResult, which is used
|
||||
// to update the original result finally. This makes sure that each call to updateNodeScoresForLabel
|
||||
// receives the same mapResult to work with.
|
||||
// Why are doing this? This is a workaround for the migration from priorities to score plugins.
|
||||
// Historically the priority is designed to handle only one label, and multiple priorities are configured
|
||||
// to work with multiple labels. Using multiple plugins is not allowed in the new framework. Therefore
|
||||
// we need to modify the old priority to be able to handle multiple labels so that it can be mapped
|
||||
// to a single plugin.
|
||||
// TODO: This will be deprecated soon.
|
||||
func (pl *ServiceAffinity) updateNodeScoresForLabel(sharedLister schedulerlisters.SharedLister, mapResult framework.NodeScoreList, reduceResult []float64, label string) error {
|
||||
var numServicePods int64
|
||||
var labelValue string
|
||||
podCounts := map[string]int64{}
|
||||
labelNodesStatus := map[string]string{}
|
||||
maxPriorityFloat64 := float64(framework.MaxNodeScore)
|
||||
|
||||
for _, nodePriority := range mapResult {
|
||||
numServicePods += nodePriority.Score
|
||||
nodeInfo, err := sharedLister.NodeInfos().Get(nodePriority.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !labels.Set(nodeInfo.Node().Labels).Has(label) {
|
||||
continue
|
||||
}
|
||||
|
||||
labelValue = labels.Set(nodeInfo.Node().Labels).Get(label)
|
||||
labelNodesStatus[nodePriority.Name] = labelValue
|
||||
podCounts[labelValue] += nodePriority.Score
|
||||
}
|
||||
|
||||
//score int - scale of 0-maxPriority
|
||||
// 0 being the lowest priority and maxPriority being the highest
|
||||
for i, nodePriority := range mapResult {
|
||||
labelValue, ok := labelNodesStatus[nodePriority.Name]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// initializing to the default/max node score of maxPriority
|
||||
fScore := maxPriorityFloat64
|
||||
if numServicePods > 0 {
|
||||
fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[labelValue]) / float64(numServicePods))
|
||||
}
|
||||
// The score of current label only accounts for 1/len(s.labels) of the total score.
|
||||
// The policy API definition only allows a single label to be configured, associated with a weight.
|
||||
// This is compensated by the fact that the total weight is the sum of all weights configured
|
||||
// in each policy config.
|
||||
reduceResult[i] += fScore / float64(len(pl.args.AntiAffinityLabelsPreference))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ScoreExtensions of the Score plugin.
|
||||
|
@ -391,13 +391,13 @@ func TestServiceAffinityScore(t *testing.T) {
|
||||
nodes := makeLabeledNodeList(test.nodes)
|
||||
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
|
||||
serviceLister := fakelisters.ServiceLister(test.services)
|
||||
priorityMapFunction, priorityReduceFunction := priorities.NewServiceAntiAffinityPriority(snapshot.Pods(), serviceLister, test.labels)
|
||||
|
||||
p := &ServiceAffinity{
|
||||
sharedLister: snapshot,
|
||||
serviceLister: serviceLister,
|
||||
priorityMapFunction: priorityMapFunction,
|
||||
priorityReduceFunction: priorityReduceFunction,
|
||||
sharedLister: snapshot,
|
||||
serviceLister: serviceLister,
|
||||
args: Args{
|
||||
AntiAffinityLabelsPreference: test.labels,
|
||||
},
|
||||
}
|
||||
metaDataProducer := priorities.NewMetadataFactory(
|
||||
fakelisters.ServiceLister(test.services),
|
||||
|
Loading…
Reference in New Issue
Block a user