mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
Refactoring of priority function(CaculateSpreadPriority) by using map/reduce pattern
This commit is contained in:
parent
ce910f249d
commit
4aa92bac73
@ -17,12 +17,10 @@ limitations under the License.
|
||||
package priorities
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
utilnode "k8s.io/kubernetes/pkg/util/node"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
@ -46,149 +44,135 @@ func NewSelectorSpreadPriority(
|
||||
serviceLister algorithm.ServiceLister,
|
||||
controllerLister algorithm.ControllerLister,
|
||||
replicaSetLister algorithm.ReplicaSetLister,
|
||||
statefulSetLister algorithm.StatefulSetLister) algorithm.PriorityFunction {
|
||||
statefulSetLister algorithm.StatefulSetLister) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
|
||||
selectorSpread := &SelectorSpread{
|
||||
serviceLister: serviceLister,
|
||||
controllerLister: controllerLister,
|
||||
replicaSetLister: replicaSetLister,
|
||||
statefulSetLister: statefulSetLister,
|
||||
}
|
||||
return selectorSpread.CalculateSpreadPriority
|
||||
return selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce
|
||||
}
|
||||
|
||||
// Returns selectors of services, RCs and RSs matching the given pod.
|
||||
func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector {
|
||||
var selectors []labels.Selector
|
||||
if services, err := sl.GetPodServices(pod); err == nil {
|
||||
for _, service := range services {
|
||||
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
|
||||
}
|
||||
}
|
||||
if rcs, err := cl.GetPodControllers(pod); err == nil {
|
||||
for _, rc := range rcs {
|
||||
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
|
||||
}
|
||||
}
|
||||
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
|
||||
for _, rs := range rss {
|
||||
if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
|
||||
selectors = append(selectors, selector)
|
||||
}
|
||||
}
|
||||
}
|
||||
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
|
||||
for _, ss := range sss {
|
||||
if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
|
||||
selectors = append(selectors, selector)
|
||||
}
|
||||
}
|
||||
}
|
||||
return selectors
|
||||
}
|
||||
|
||||
func (s *SelectorSpread) getSelectors(pod *v1.Pod) []labels.Selector {
|
||||
return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
|
||||
}
|
||||
|
||||
// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
|
||||
// CalculateSpreadPriorityMap spreads pods across hosts, considering pods belonging to the same service or replication controller.
|
||||
// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors.
|
||||
// It favors nodes that have fewer existing matching pods.
|
||||
// i.e. it pushes the scheduler towards a node where there's the smallest number of
|
||||
// pods which match the same service, RC or RS selectors as the pod being scheduled.
|
||||
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
|
||||
func (s *SelectorSpread) CalculateSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
|
||||
selectors := s.getSelectors(pod)
|
||||
func (s *SelectorSpread) CalculateSpreadPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
|
||||
var selectors []labels.Selector
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
// Count similar pods by node
|
||||
countsByNodeName := make(map[string]float64, len(nodes))
|
||||
countsByZone := make(map[string]float64, 10)
|
||||
maxCountByNodeName := float64(0)
|
||||
countsByNodeNameLock := sync.Mutex{}
|
||||
priorityMeta, ok := meta.(*priorityMetadata)
|
||||
if ok {
|
||||
selectors = priorityMeta.podSelectors
|
||||
} else {
|
||||
selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
|
||||
}
|
||||
|
||||
if len(selectors) == 0 {
|
||||
return schedulerapi.HostPriority{
|
||||
Host: node.Name,
|
||||
Score: int(0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
count := float64(0)
|
||||
for _, nodePod := range nodeInfo.Pods() {
|
||||
if pod.Namespace != nodePod.Namespace {
|
||||
continue
|
||||
}
|
||||
// When we are replacing a failed pod, we often see the previous
|
||||
// deleted version while scheduling the replacement.
|
||||
// Ignore the previous deleted version for spreading purposes
|
||||
// (it can still be considered for resource restrictions etc.)
|
||||
if nodePod.DeletionTimestamp != nil {
|
||||
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
|
||||
continue
|
||||
}
|
||||
matches := false
|
||||
for _, selector := range selectors {
|
||||
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
|
||||
matches = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if matches {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return schedulerapi.HostPriority{
|
||||
Host: node.Name,
|
||||
Score: int(count),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CalculateSpreadPriorityReduce calculates the source of each node based on the number of existing matching pods on the node
|
||||
// where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
|
||||
func (s *SelectorSpread) CalculateSpreadPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error {
|
||||
var selectors []labels.Selector
|
||||
countsByZone := make(map[string]int, 10)
|
||||
maxCountByZone := int(0)
|
||||
maxCountByNodeName := int(0)
|
||||
|
||||
priorityMeta, ok := meta.(*priorityMetadata)
|
||||
if ok {
|
||||
selectors = priorityMeta.podSelectors
|
||||
} else {
|
||||
selectors = getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister, s.statefulSetLister)
|
||||
}
|
||||
|
||||
if len(selectors) > 0 {
|
||||
processNodeFunc := func(i int) {
|
||||
nodeName := nodes[i].Name
|
||||
count := float64(0)
|
||||
for _, nodePod := range nodeNameToInfo[nodeName].Pods() {
|
||||
if pod.Namespace != nodePod.Namespace {
|
||||
continue
|
||||
}
|
||||
// When we are replacing a failed pod, we often see the previous
|
||||
// deleted version while scheduling the replacement.
|
||||
// Ignore the previous deleted version for spreading purposes
|
||||
// (it can still be considered for resource restrictions etc.)
|
||||
if nodePod.DeletionTimestamp != nil {
|
||||
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
|
||||
continue
|
||||
}
|
||||
matches := false
|
||||
for _, selector := range selectors {
|
||||
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
|
||||
matches = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if matches {
|
||||
count++
|
||||
}
|
||||
for i := range result {
|
||||
if result[i].Score > maxCountByNodeName {
|
||||
maxCountByNodeName = result[i].Score
|
||||
}
|
||||
zoneId := utilnode.GetZoneKey(nodes[i])
|
||||
|
||||
countsByNodeNameLock.Lock()
|
||||
defer countsByNodeNameLock.Unlock()
|
||||
countsByNodeName[nodeName] = count
|
||||
if count > maxCountByNodeName {
|
||||
maxCountByNodeName = count
|
||||
}
|
||||
if zoneId != "" {
|
||||
countsByZone[zoneId] += count
|
||||
zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
|
||||
if zoneId == "" {
|
||||
continue
|
||||
}
|
||||
countsByZone[zoneId] += result[i].Score
|
||||
}
|
||||
}
|
||||
|
||||
for zoneId := range countsByZone {
|
||||
if countsByZone[zoneId] > maxCountByZone {
|
||||
maxCountByZone = countsByZone[zoneId]
|
||||
}
|
||||
workqueue.Parallelize(16, len(nodes), processNodeFunc)
|
||||
}
|
||||
|
||||
// Aggregate by-zone information
|
||||
// Compute the maximum number of pods hosted in any zone
|
||||
haveZones := len(countsByZone) != 0
|
||||
maxCountByZone := float64(0)
|
||||
for _, count := range countsByZone {
|
||||
if count > maxCountByZone {
|
||||
maxCountByZone = count
|
||||
}
|
||||
}
|
||||
|
||||
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
||||
//score int - scale of 0-maxPriority
|
||||
// 0 being the lowest priority and maxPriority being the highest
|
||||
for _, node := range nodes {
|
||||
for i := range result {
|
||||
// initializing to the default/max node score of maxPriority
|
||||
fScore := float64(schedulerapi.MaxPriority)
|
||||
if maxCountByNodeName > 0 {
|
||||
fScore = float64(schedulerapi.MaxPriority) * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName)
|
||||
fScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByNodeName-result[i].Score) / float64(maxCountByNodeName))
|
||||
}
|
||||
|
||||
// If there is zone information present, incorporate it
|
||||
if haveZones {
|
||||
zoneId := utilnode.GetZoneKey(node)
|
||||
zoneId := utilnode.GetZoneKey(nodeNameToInfo[result[i].Host].Node())
|
||||
if zoneId != "" {
|
||||
zoneScore := float64(schedulerapi.MaxPriority)
|
||||
if maxCountByZone > 0 {
|
||||
zoneScore = float64(schedulerapi.MaxPriority) * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone)
|
||||
zoneScore = float64(schedulerapi.MaxPriority) * (float64(maxCountByZone-countsByZone[zoneId]) / float64(maxCountByZone))
|
||||
}
|
||||
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
|
||||
}
|
||||
}
|
||||
|
||||
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
|
||||
result[i].Score = int(fScore)
|
||||
if glog.V(10) {
|
||||
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
|
||||
// not logged. There is visible performance gain from it.
|
||||
glog.V(10).Infof(
|
||||
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore),
|
||||
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, result[i].Host, int(fScore),
|
||||
)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
type ServiceAntiAffinity struct {
|
||||
|
Loading…
Reference in New Issue
Block a user