mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
compute pod selectors in priority meta data producer
This commit is contained in:
parent
4aa92bac73
commit
bed435deca
@ -253,7 +253,7 @@ func TestBalancedResourceAllocation(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
|
||||
list, err := priorityFunction(BalancedResourceAllocationMap, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
list, err := priorityFunction(BalancedResourceAllocationMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -161,7 +161,7 @@ func TestImageLocalityPriority(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
|
||||
list, err := priorityFunction(ImageLocalityPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
list, err := priorityFunction(ImageLocalityPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -253,7 +253,7 @@ func TestLeastRequested(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
|
||||
list, err := priorityFunction(LeastRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
list, err := priorityFunction(LeastRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -18,26 +18,79 @@ package priorities
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
type PriorityMetadataFactory struct {
|
||||
serviceLister algorithm.ServiceLister
|
||||
controllerLister algorithm.ControllerLister
|
||||
replicaSetLister algorithm.ReplicaSetLister
|
||||
statefulSetLister algorithm.StatefulSetLister
|
||||
}
|
||||
|
||||
func NewPriorityMetadataFactory(serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister, statefulSetLister algorithm.StatefulSetLister) algorithm.MetadataProducer {
|
||||
factory := &PriorityMetadataFactory{
|
||||
serviceLister: serviceLister,
|
||||
controllerLister: controllerLister,
|
||||
replicaSetLister: replicaSetLister,
|
||||
statefulSetLister: statefulSetLister,
|
||||
}
|
||||
return factory.PriorityMetadata
|
||||
}
|
||||
|
||||
// priorityMetadata is a type that is passed as metadata for priority functions
|
||||
type priorityMetadata struct {
|
||||
nonZeroRequest *schedulercache.Resource
|
||||
podTolerations []v1.Toleration
|
||||
affinity *v1.Affinity
|
||||
podSelectors []labels.Selector
|
||||
}
|
||||
|
||||
// PriorityMetadata is a MetadataProducer. Node info can be nil.
|
||||
func PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
|
||||
func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
|
||||
// If we cannot compute metadata, just return nil
|
||||
if pod == nil {
|
||||
return nil
|
||||
}
|
||||
tolerationsPreferNoSchedule := getAllTolerationPreferNoSchedule(pod.Spec.Tolerations)
|
||||
podSelectors := getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister)
|
||||
return &priorityMetadata{
|
||||
nonZeroRequest: getNonZeroRequests(pod),
|
||||
podTolerations: tolerationsPreferNoSchedule,
|
||||
affinity: pod.Spec.Affinity,
|
||||
podSelectors: podSelectors,
|
||||
}
|
||||
}
|
||||
|
||||
// getSelectors returns selectors of services, RCs and RSs matching the given pod.
|
||||
func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector {
|
||||
var selectors []labels.Selector
|
||||
if services, err := sl.GetPodServices(pod); err == nil {
|
||||
for _, service := range services {
|
||||
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
|
||||
}
|
||||
}
|
||||
if rcs, err := cl.GetPodControllers(pod); err == nil {
|
||||
for _, rc := range rcs {
|
||||
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
|
||||
}
|
||||
}
|
||||
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
|
||||
for _, rs := range rss {
|
||||
if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
|
||||
selectors = append(selectors, selector)
|
||||
}
|
||||
}
|
||||
}
|
||||
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
|
||||
for _, ss := range sss {
|
||||
if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
|
||||
selectors = append(selectors, selector)
|
||||
}
|
||||
}
|
||||
}
|
||||
return selectors
|
||||
}
|
||||
|
@ -20,11 +20,14 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
apps "k8s.io/api/apps/v1beta1"
|
||||
"k8s.io/api/core/v1"
|
||||
extensions "k8s.io/api/extensions/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
|
||||
)
|
||||
|
||||
func TestPriorityMetadata(t *testing.T) {
|
||||
@ -123,8 +126,13 @@ func TestPriorityMetadata(t *testing.T) {
|
||||
test: "Produce a priorityMetadata with specified requests",
|
||||
},
|
||||
}
|
||||
mataDataProducer := NewPriorityMetadataFactory(
|
||||
schedulertesting.FakeServiceLister([]*v1.Service{}),
|
||||
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
|
||||
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
|
||||
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{}))
|
||||
for _, test := range tests {
|
||||
ptData := PriorityMetadata(test.pod, nil)
|
||||
ptData := mataDataProducer(test.pod, nil)
|
||||
if !reflect.DeepEqual(test.expected, ptData) {
|
||||
t.Errorf("%s: expected %#v, got %#v", test.test, test.expected, ptData)
|
||||
}
|
||||
|
@ -210,7 +210,7 @@ func TestMostRequested(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
|
||||
list, err := priorityFunction(MostRequestedPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
list, err := priorityFunction(MostRequestedPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ func TestNodeAffinityPriority(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
|
||||
nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce)
|
||||
nap := priorityFunction(CalculateNodeAffinityPriorityMap, CalculateNodeAffinityPriorityReduce, nil)
|
||||
list, err := nap(test.pod, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -108,7 +108,11 @@ func TestNewNodeLabelPriority(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
|
||||
list, err := priorityFunction(NewNodeLabelPriority(test.label, test.presence))(nil, nodeNameToInfo, test.nodes)
|
||||
labelPrioritizer := &NodeLabelPrioritizer{
|
||||
label: test.label,
|
||||
presence: test.presence,
|
||||
}
|
||||
list, err := priorityFunction(labelPrioritizer.CalculateNodeLabelPriorityMap, nil, nil)(nil, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ func TestNodePreferAvoidPriority(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
|
||||
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
list, err := priorityFunction(CalculateNodePreferAvoidPodsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -338,17 +338,26 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
|
||||
for i, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeNodeList(test.nodes))
|
||||
selectorSpread := SelectorSpread{
|
||||
serviceLister: schedulertesting.FakeServiceLister(test.services),
|
||||
controllerLister: schedulertesting.FakeControllerLister(test.rcs),
|
||||
replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss),
|
||||
statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss),
|
||||
}
|
||||
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeNodeList(test.nodes))
|
||||
|
||||
mataDataProducer := NewPriorityMetadataFactory(
|
||||
schedulertesting.FakeServiceLister(test.services),
|
||||
schedulertesting.FakeControllerLister(test.rcs),
|
||||
schedulertesting.FakeReplicaSetLister(test.rss),
|
||||
schedulertesting.FakeStatefulSetLister(test.sss))
|
||||
mataData := mataDataProducer(test.pod, nodeNameToInfo)
|
||||
|
||||
ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, mataData)
|
||||
list, err := ttp(test.pod, nodeNameToInfo, makeNodeList(test.nodes))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
t.Errorf("unexpected error: %v index : %d\n", err, i)
|
||||
}
|
||||
if !reflect.DeepEqual(test.expectedList, list) {
|
||||
t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list)
|
||||
@ -544,6 +553,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")),
|
||||
buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")),
|
||||
},
|
||||
//nodes: []string{nodeMachine1Zone3, nodeMachine1Zone2, nodeMachine1Zone3},
|
||||
rcs: []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
// Note that because we put two pods on the same node (nodeMachine1Zone3),
|
||||
@ -564,17 +574,25 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
|
||||
for i, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeLabeledNodeList(labeledNodes))
|
||||
selectorSpread := SelectorSpread{
|
||||
serviceLister: schedulertesting.FakeServiceLister(test.services),
|
||||
controllerLister: schedulertesting.FakeControllerLister(test.rcs),
|
||||
replicaSetLister: schedulertesting.FakeReplicaSetLister(test.rss),
|
||||
statefulSetLister: schedulertesting.FakeStatefulSetLister(test.sss),
|
||||
}
|
||||
list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes))
|
||||
|
||||
mataDataProducer := NewPriorityMetadataFactory(
|
||||
schedulertesting.FakeServiceLister(test.services),
|
||||
schedulertesting.FakeControllerLister(test.rcs),
|
||||
schedulertesting.FakeReplicaSetLister(test.rss),
|
||||
schedulertesting.FakeStatefulSetLister(test.sss))
|
||||
mataData := mataDataProducer(test.pod, nodeNameToInfo)
|
||||
ttp := priorityFunction(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, mataData)
|
||||
list, err := ttp(test.pod, nodeNameToInfo, makeLabeledNodeList(labeledNodes))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
t.Errorf("unexpected error: %v index : %d", err, i)
|
||||
}
|
||||
// sort the two lists to avoid failures on account of different ordering
|
||||
sort.Sort(test.expectedList)
|
||||
|
@ -227,7 +227,7 @@ func TestTaintAndToleration(t *testing.T) {
|
||||
}
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
|
||||
ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce)
|
||||
ttp := priorityFunction(ComputeTaintTolerationPriorityMap, ComputeTaintTolerationPriorityReduce, nil)
|
||||
list, err := ttp(test.pod, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("%s, unexpected error: %v", test.test, err)
|
||||
|
@ -41,18 +41,18 @@ func makeNode(node string, milliCPU, memory int64) *v1.Node {
|
||||
}
|
||||
}
|
||||
|
||||
func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.PriorityReduceFunction) algorithm.PriorityFunction {
|
||||
func priorityFunction(mapFn algorithm.PriorityMapFunction, reduceFn algorithm.PriorityReduceFunction, mataData interface{}) algorithm.PriorityFunction {
|
||||
return func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
||||
for i := range nodes {
|
||||
hostResult, err := mapFn(pod, nil, nodeNameToInfo[nodes[i].Name])
|
||||
hostResult, err := mapFn(pod, mataData, nodeNameToInfo[nodes[i].Name])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, hostResult)
|
||||
}
|
||||
if reduceFn != nil {
|
||||
if err := reduceFn(pod, nil, nodeNameToInfo, result); err != nil {
|
||||
if err := reduceFn(pod, mataData, nodeNameToInfo, result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ func init() {
|
||||
})
|
||||
factory.RegisterPriorityMetadataProducerFactory(
|
||||
func(args factory.PluginFactoryArgs) algorithm.MetadataProducer {
|
||||
return priorities.PriorityMetadata
|
||||
return priorities.NewPriorityMetadataFactory(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
|
||||
})
|
||||
|
||||
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
|
||||
@ -90,13 +90,12 @@ func init() {
|
||||
factory.RegisterPriorityConfigFactory(
|
||||
"ServiceSpreadingPriority",
|
||||
factory.PriorityConfigFactory{
|
||||
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
|
||||
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
|
||||
return priorities.NewSelectorSpreadPriority(args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}, algorithm.EmptyStatefulSetLister{})
|
||||
},
|
||||
Weight: 1,
|
||||
},
|
||||
)
|
||||
|
||||
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
|
||||
// Register the priority function so that its available
|
||||
// but do not include it as part of the default priorities
|
||||
@ -213,12 +212,13 @@ func defaultPriorities() sets.String {
|
||||
factory.RegisterPriorityConfigFactory(
|
||||
"SelectorSpreadPriority",
|
||||
factory.PriorityConfigFactory{
|
||||
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
|
||||
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
|
||||
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
|
||||
},
|
||||
Weight: 1,
|
||||
},
|
||||
),
|
||||
|
||||
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
|
||||
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
|
||||
factory.RegisterPriorityConfigFactory(
|
||||
|
@ -522,18 +522,26 @@ func TestZeroRequest(t *testing.T) {
|
||||
priorityConfigs := []algorithm.PriorityConfig{
|
||||
{Map: algorithmpriorities.LeastRequestedPriorityMap, Weight: 1},
|
||||
{Map: algorithmpriorities.BalancedResourceAllocationMap, Weight: 1},
|
||||
{
|
||||
Function: algorithmpriorities.NewSelectorSpreadPriority(
|
||||
schedulertesting.FakeServiceLister([]*v1.Service{}),
|
||||
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
|
||||
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
|
||||
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{})),
|
||||
Weight: 1,
|
||||
},
|
||||
}
|
||||
selectorSpreadPriorityMap, selectorSpreadPriorityReduce := algorithmpriorities.NewSelectorSpreadPriority(
|
||||
schedulertesting.FakeServiceLister([]*v1.Service{}),
|
||||
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
|
||||
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
|
||||
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{}))
|
||||
pc := algorithm.PriorityConfig{Map: selectorSpreadPriorityMap, Reduce: selectorSpreadPriorityReduce, Weight: 1}
|
||||
priorityConfigs = append(priorityConfigs, pc)
|
||||
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
|
||||
|
||||
mataDataProducer := algorithmpriorities.NewPriorityMetadataFactory(
|
||||
schedulertesting.FakeServiceLister([]*v1.Service{}),
|
||||
schedulertesting.FakeControllerLister([]*v1.ReplicationController{}),
|
||||
schedulertesting.FakeReplicaSetLister([]*extensions.ReplicaSet{}),
|
||||
schedulertesting.FakeStatefulSetLister([]*apps.StatefulSet{}))
|
||||
mataData := mataDataProducer(test.pod, nodeNameToInfo)
|
||||
|
||||
list, err := PrioritizeNodes(
|
||||
test.pod, nodeNameToInfo, algorithm.EmptyMetadataProducer, priorityConfigs,
|
||||
test.pod, nodeNameToInfo, mataData, priorityConfigs,
|
||||
schedulertesting.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user