mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
Merge pull request #34624 from jayunit100/scheduler-checksvc-1
Automatic merge from submit-queue Minor cleanup to scheduler. Part 1 of pr for scheduler modularization specifically supporting CheckServiceAffinity improvements #33763. These are some innoccous changes which help separate the predicate injection machinery from the cleanup parts of CheckServiceAffinity @timothysc @wojtek-t - changes to metadata.go are for the follow on pr - changes to prio/pred stuff is all passing unit tests hoping to grease this through and then the interesting part is in the follow on pr.
This commit is contained in:
commit
f42337f9e8
@ -637,38 +637,36 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
|
|||||||
return affinity.CheckServiceAffinity
|
return affinity.CheckServiceAffinity
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckServiceAffinity ensures that only the nodes that match the specified labels are considered for scheduling.
|
// The checkServiceAffinity predicate matches nodes in such a way to force that
|
||||||
// The set of labels to be considered are provided to the struct (ServiceAffinity).
|
// ServiceAffinity.labels are homogenous for pods added to a node.
|
||||||
// The pod is checked for the labels and any missing labels are then checked in the node
|
// (i.e. it returns true IFF this pod can be added to this node, such
|
||||||
// that hosts the service pods (peers) for the given pod.
|
// that all other pods in the same service are running on nodes w/
|
||||||
|
// the exact same ServiceAffinity.label values).
|
||||||
//
|
//
|
||||||
// We add an implicit selector requiring some particular value V for label L to a pod, if:
|
// Details:
|
||||||
// - L is listed in the ServiceAffinity object that is passed into the function
|
//
|
||||||
// - the pod does not have any NodeSelector for L
|
// If (the svc affinity labels are not a subset of pod's label selectors )
|
||||||
// - some other pod from the same service is already scheduled onto a node that has value V for label L
|
// The pod has all information necessary to check affinity, the pod's label selector is sufficient to calculate
|
||||||
|
// the match.
|
||||||
|
// Otherwise:
|
||||||
|
// Create an "implicit selector" which gaurantees pods will land on nodes with similar values
|
||||||
|
// for the affinity labels.
|
||||||
|
// To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace.
|
||||||
|
// These backfilled labels in the selector "L" are defined like so:
|
||||||
|
// - L is a label that the ServiceAffinity object needs as a matching constraints.
|
||||||
|
// - L is not defined in the pod itself already.
|
||||||
|
// - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value.
|
||||||
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||||
node := nodeInfo.Node()
|
node := nodeInfo.Node()
|
||||||
if node == nil {
|
if node == nil {
|
||||||
return false, nil, fmt.Errorf("node not found")
|
return false, nil, fmt.Errorf("node not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
var affinitySelector labels.Selector
|
|
||||||
|
|
||||||
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
|
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
|
||||||
affinityLabels := map[string]string{}
|
affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector))
|
||||||
nodeSelector := labels.Set(pod.Spec.NodeSelector)
|
|
||||||
labelsExist := true
|
|
||||||
for _, l := range s.labels {
|
|
||||||
if nodeSelector.Has(l) {
|
|
||||||
affinityLabels[l] = nodeSelector.Get(l)
|
|
||||||
} else {
|
|
||||||
// the current pod does not specify all the labels, look in the existing service pods
|
|
||||||
labelsExist = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// skip looking at other pods in the service if the current pod defines all the required affinity labels
|
// Introspect services IFF we didn't predefine all the affinity labels in the pod itself.
|
||||||
if !labelsExist {
|
if len(s.labels) > len(affinityLabels) {
|
||||||
services, err := s.serviceLister.GetPodServices(pod)
|
services, err := s.serviceLister.GetPodServices(pod)
|
||||||
if err == nil && len(services) > 0 {
|
if err == nil && len(services) > 0 {
|
||||||
// just use the first service and get the other pods within the service
|
// just use the first service and get the other pods within the service
|
||||||
@ -679,40 +677,20 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
|
|||||||
return false, nil, err
|
return false, nil, err
|
||||||
}
|
}
|
||||||
// consider only the pods that belong to the same namespace
|
// consider only the pods that belong to the same namespace
|
||||||
nsServicePods := []*api.Pod{}
|
nsServicePods := FilterPodsByNamespace(servicePods, pod.Namespace)
|
||||||
for _, nsPod := range servicePods {
|
|
||||||
if nsPod.Namespace == pod.Namespace {
|
|
||||||
nsServicePods = append(nsServicePods, nsPod)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(nsServicePods) > 0 {
|
if len(nsServicePods) > 0 {
|
||||||
// consider any service pod and fetch the node its hosted on
|
// consider any service pod and fetch the node its hosted on
|
||||||
otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName)
|
otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, nil, err
|
return false, nil, err
|
||||||
}
|
}
|
||||||
for _, l := range s.labels {
|
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(otherNode.Labels))
|
||||||
// If the pod being scheduled has the label value specified, do not override it
|
|
||||||
if _, exists := affinityLabels[l]; exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if labels.Set(otherNode.Labels).Has(l) {
|
|
||||||
affinityLabels[l] = labels.Set(otherNode.Labels).Get(l)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if there are no existing pods in the service, consider all nodes
|
|
||||||
if len(affinityLabels) == 0 {
|
|
||||||
affinitySelector = labels.Everything()
|
|
||||||
} else {
|
|
||||||
affinitySelector = labels.Set(affinityLabels).AsSelector()
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if the node matches the selector
|
// check if the node matches the selector
|
||||||
if affinitySelector.Matches(labels.Set(node.Labels)) {
|
if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
|
||||||
return true, nil, nil
|
return true, nil, nil
|
||||||
}
|
}
|
||||||
return false, []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}, nil
|
return false, []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}, nil
|
||||||
|
64
plugin/pkg/scheduler/algorithm/predicates/utils.go
Normal file
64
plugin/pkg/scheduler/algorithm/predicates/utils.go
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package predicates
|
||||||
|
|
||||||
|
import "k8s.io/kubernetes/pkg/labels"
|
||||||
|
import "k8s.io/kubernetes/pkg/api"
|
||||||
|
|
||||||
|
// FindLabelsInSet gets as many key/value pairs as possible out of a label set.
|
||||||
|
func FindLabelsInSet(labelsToKeep []string, selector labels.Set) map[string]string {
|
||||||
|
aL := make(map[string]string)
|
||||||
|
for _, l := range labelsToKeep {
|
||||||
|
if selector.Has(l) {
|
||||||
|
aL[l] = selector.Get(l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return aL
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddUnsetLabelsToMap backfills missing values with values we find in a map.
|
||||||
|
func AddUnsetLabelsToMap(aL map[string]string, labelsToAdd []string, labelSet labels.Set) {
|
||||||
|
for _, l := range labelsToAdd {
|
||||||
|
// if the label is already there, dont overwrite it.
|
||||||
|
if _, exists := aL[l]; exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// otherwise, backfill this label.
|
||||||
|
if labelSet.Has(l) {
|
||||||
|
aL[l] = labelSet.Get(l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FilterPodsByNamespace filters pods outside a namespace from the given list.
|
||||||
|
func FilterPodsByNamespace(pods []*api.Pod, ns string) []*api.Pod {
|
||||||
|
filtered := []*api.Pod{}
|
||||||
|
for _, nsPod := range pods {
|
||||||
|
if nsPod.Namespace == ns {
|
||||||
|
filtered = append(filtered, nsPod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filtered
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateSelectorFromLabels is used to define a selector that corresponds to the keys in a map.
|
||||||
|
func CreateSelectorFromLabels(aL map[string]string) labels.Selector {
|
||||||
|
if aL == nil || len(aL) == 0 {
|
||||||
|
return labels.Everything()
|
||||||
|
}
|
||||||
|
return labels.Set(aL).AsSelector()
|
||||||
|
}
|
65
plugin/pkg/scheduler/algorithm/predicates/utils_test.go
Normal file
65
plugin/pkg/scheduler/algorithm/predicates/utils_test.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package predicates
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ExampleUtils is a https://blog.golang.org/examples styled unit test.
|
||||||
|
func ExampleFindLabelsInSet() {
|
||||||
|
labelSubset := labels.Set{}
|
||||||
|
labelSubset["label1"] = "value1"
|
||||||
|
labelSubset["label2"] = "value2"
|
||||||
|
// Lets make believe that these pods are on the cluster.
|
||||||
|
// Utility functions will inspect their labels, filter them, and so on.
|
||||||
|
nsPods := []*api.Pod{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "pod1",
|
||||||
|
Namespace: "ns1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
"label1": "wontSeeThis",
|
||||||
|
"label2": "wontSeeThis",
|
||||||
|
"label3": "will_see_this",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, // first pod which will be used via the utilities
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "pod2",
|
||||||
|
Namespace: "ns1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
{}, // a third pod which will have no effect on anything.
|
||||||
|
}
|
||||||
|
fmt.Println(FindLabelsInSet([]string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)["label3"])
|
||||||
|
AddUnsetLabelsToMap(labelSubset, []string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)
|
||||||
|
fmt.Println(labelSubset)
|
||||||
|
|
||||||
|
for _, pod := range FilterPodsByNamespace(nsPods, "ns1") {
|
||||||
|
fmt.Print(pod.Name, ",")
|
||||||
|
}
|
||||||
|
// Output:
|
||||||
|
// will_see_this
|
||||||
|
// label1=value1,label2=value2,label3=will_see_this
|
||||||
|
// pod1,pod2,
|
||||||
|
}
|
@ -28,7 +28,8 @@ type priorityMetadata struct {
|
|||||||
affinity *api.Affinity
|
affinity *api.Affinity
|
||||||
}
|
}
|
||||||
|
|
||||||
func PriorityMetadata(pod *api.Pod) interface{} {
|
// PriorityMetadata is a MetadataProducer. Node info can be nil.
|
||||||
|
func PriorityMetadata(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
|
||||||
// If we cannot compute metadata, just return nil
|
// If we cannot compute metadata, just return nil
|
||||||
if pod == nil {
|
if pod == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -39,7 +39,7 @@ type PriorityMapFunction func(pod *api.Pod, meta interface{}, nodeInfo *schedule
|
|||||||
type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error
|
type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error
|
||||||
|
|
||||||
// MetdataProducer is a function that computes metadata for a given pod.
|
// MetdataProducer is a function that computes metadata for a given pod.
|
||||||
type MetadataProducer func(pod *api.Pod) interface{}
|
type MetadataProducer func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{}
|
||||||
|
|
||||||
// DEPRECATED
|
// DEPRECATED
|
||||||
// Use Map-Reduce pattern for priority functions.
|
// Use Map-Reduce pattern for priority functions.
|
||||||
@ -54,7 +54,7 @@ type PriorityConfig struct {
|
|||||||
Weight int
|
Weight int
|
||||||
}
|
}
|
||||||
|
|
||||||
func EmptyMetadataProducer(pod *api.Pod) interface{} {
|
func EmptyMetadataProducer(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,8 +113,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
|
|||||||
}
|
}
|
||||||
|
|
||||||
trace.Step("Prioritizing")
|
trace.Step("Prioritizing")
|
||||||
meta := g.priorityMetaProducer(pod)
|
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
|
||||||
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, meta, g.prioritizers, filteredNodes, g.extenders)
|
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user