mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 10:43:56 +00:00
Merge pull request #28836 from wojtek-t/optimize_priorities_3
Automatic merge from submit-queue Few more optimizations for scheduler Ref #28590 @davidopp
This commit is contained in:
commit
89be039352
@ -69,6 +69,7 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
|
|||||||
type predicateMetadata struct {
|
type predicateMetadata struct {
|
||||||
podBestEffort bool
|
podBestEffort bool
|
||||||
podRequest *resourceRequest
|
podRequest *resourceRequest
|
||||||
|
podPorts map[int]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func PredicateMetadata(pod *api.Pod) interface{} {
|
func PredicateMetadata(pod *api.Pod) interface{} {
|
||||||
@ -79,6 +80,7 @@ func PredicateMetadata(pod *api.Pod) interface{} {
|
|||||||
return &predicateMetadata{
|
return &predicateMetadata{
|
||||||
podBestEffort: isPodBestEffort(pod),
|
podBestEffort: isPodBestEffort(pod),
|
||||||
podRequest: getResourceRequest(pod),
|
podRequest: getResourceRequest(pod),
|
||||||
|
podPorts: getUsedPorts(pod),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -479,8 +481,7 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
|
|||||||
}
|
}
|
||||||
|
|
||||||
var podRequest *resourceRequest
|
var podRequest *resourceRequest
|
||||||
predicateMeta, ok := meta.(*predicateMetadata)
|
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||||
if ok {
|
|
||||||
podRequest = predicateMeta.podRequest
|
podRequest = predicateMeta.podRequest
|
||||||
} else {
|
} else {
|
||||||
// We couldn't parse metadata - fallback to computing it.
|
// We couldn't parse metadata - fallback to computing it.
|
||||||
@ -751,16 +752,21 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
|
|||||||
}
|
}
|
||||||
|
|
||||||
func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
|
func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
|
||||||
wantPorts := getUsedPorts(pod)
|
var wantPorts map[int]bool
|
||||||
|
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||||
|
wantPorts = predicateMeta.podPorts
|
||||||
|
} else {
|
||||||
|
// We couldn't parse metadata - fallback to computing it.
|
||||||
|
wantPorts = getUsedPorts(pod)
|
||||||
|
}
|
||||||
if len(wantPorts) == 0 {
|
if len(wantPorts) == 0 {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Aggregate it at the NodeInfo level.
|
||||||
existingPorts := getUsedPorts(nodeInfo.Pods()...)
|
existingPorts := getUsedPorts(nodeInfo.Pods()...)
|
||||||
for wport := range wantPorts {
|
for wport := range wantPorts {
|
||||||
if wport == 0 {
|
if wport != 0 && existingPorts[wport] {
|
||||||
continue
|
|
||||||
}
|
|
||||||
if existingPorts[wport] {
|
|
||||||
return false, ErrPodNotFitsHostPorts
|
return false, ErrPodNotFitsHostPorts
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -768,7 +774,6 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getUsedPorts(pods ...*api.Pod) map[int]bool {
|
func getUsedPorts(pods ...*api.Pod) map[int]bool {
|
||||||
// TODO: Aggregate it at the NodeInfo level.
|
|
||||||
ports := make(map[int]bool)
|
ports := make(map[int]bool)
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
for j := range pod.Spec.Containers {
|
for j := range pod.Spec.Containers {
|
||||||
|
@ -28,9 +28,25 @@ import (
|
|||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type resources struct {
|
||||||
|
millicpu int64
|
||||||
|
memory int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNonZeroRequests(pod *api.Pod) *resources {
|
||||||
|
result := &resources{}
|
||||||
|
for i := range pod.Spec.Containers {
|
||||||
|
container := &pod.Spec.Containers[i]
|
||||||
|
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
|
||||||
|
result.millicpu += cpu
|
||||||
|
result.memory += memory
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
// the unused capacity is calculated on a scale of 0-10
|
// the unused capacity is calculated on a scale of 0-10
|
||||||
// 0 being the lowest priority and 10 being the highest
|
// 0 being the lowest priority and 10 being the highest
|
||||||
func calculateScore(requested int64, capacity int64, node string) int {
|
func calculateScore(requested int64, capacity int64, node string) int64 {
|
||||||
if capacity == 0 {
|
if capacity == 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
@ -39,36 +55,33 @@ func calculateScore(requested int64, capacity int64, node string) int {
|
|||||||
requested, capacity, node)
|
requested, capacity, node)
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
return int(((capacity - requested) * 10) / capacity)
|
return ((capacity - requested) * 10) / capacity
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the resource occupancy on a node. 'node' has information about the resources on the node.
|
// Calculate the resource occupancy on a node. 'node' has information about the resources on the node.
|
||||||
// 'pods' is a list of pods currently scheduled on the node.
|
// 'pods' is a list of pods currently scheduled on the node.
|
||||||
// TODO: Use Node() from nodeInfo instead of passing it.
|
// TODO: Use Node() from nodeInfo instead of passing it.
|
||||||
func calculateResourceOccupancy(pod *api.Pod, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
|
func calculateResourceOccupancy(pod *api.Pod, podRequests *resources, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
|
||||||
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
|
|
||||||
totalMemory := nodeInfo.NonZeroRequest().Memory
|
|
||||||
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
|
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
|
||||||
capacityMemory := node.Status.Allocatable.Memory().Value()
|
capacityMemory := node.Status.Allocatable.Memory().Value()
|
||||||
|
|
||||||
// Add the resources requested by the current pod being scheduled.
|
totalResources := *podRequests
|
||||||
// This also helps differentiate between differently sized, but empty, nodes.
|
totalResources.millicpu += nodeInfo.NonZeroRequest().MilliCPU
|
||||||
for i := range pod.Spec.Containers {
|
totalResources.memory += nodeInfo.NonZeroRequest().Memory
|
||||||
container := &pod.Spec.Containers[i]
|
|
||||||
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
|
|
||||||
totalMilliCPU += cpu
|
|
||||||
totalMemory += memory
|
|
||||||
}
|
|
||||||
|
|
||||||
cpuScore := calculateScore(totalMilliCPU, capacityMilliCPU, node.Name)
|
cpuScore := calculateScore(totalResources.millicpu, capacityMilliCPU, node.Name)
|
||||||
memoryScore := calculateScore(totalMemory, capacityMemory, node.Name)
|
memoryScore := calculateScore(totalResources.memory, capacityMemory, node.Name)
|
||||||
glog.V(10).Infof(
|
if glog.V(10) {
|
||||||
"%v -> %v: Least Requested Priority, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d CPU %d memory",
|
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
|
||||||
pod.Name, node.Name,
|
// not logged. There is visible performance gain from it.
|
||||||
capacityMilliCPU, capacityMemory,
|
glog.V(10).Infof(
|
||||||
totalMilliCPU, totalMemory,
|
"%v -> %v: Least Requested Priority, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d CPU %d memory",
|
||||||
cpuScore, memoryScore,
|
pod.Name, node.Name,
|
||||||
)
|
capacityMilliCPU, capacityMemory,
|
||||||
|
totalResources.millicpu, totalResources.memory,
|
||||||
|
cpuScore, memoryScore,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
return schedulerapi.HostPriority{
|
return schedulerapi.HostPriority{
|
||||||
Host: node.Name,
|
Host: node.Name,
|
||||||
@ -86,9 +99,10 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca
|
|||||||
return schedulerapi.HostPriorityList{}, err
|
return schedulerapi.HostPriorityList{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
podResources := getNonZeroRequests(pod)
|
||||||
list := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
list := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name]))
|
list = append(list, calculateResourceOccupancy(pod, podResources, node, nodeNameToInfo[node.Name]))
|
||||||
}
|
}
|
||||||
return list, nil
|
return list, nil
|
||||||
}
|
}
|
||||||
@ -220,32 +234,26 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul
|
|||||||
return schedulerapi.HostPriorityList{}, err
|
return schedulerapi.HostPriorityList{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
podResources := getNonZeroRequests(pod)
|
||||||
list := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
list := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name]))
|
list = append(list, calculateBalancedResourceAllocation(pod, podResources, node, nodeNameToInfo[node.Name]))
|
||||||
}
|
}
|
||||||
return list, nil
|
return list, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Use Node() from nodeInfo instead of passing it.
|
// TODO: Use Node() from nodeInfo instead of passing it.
|
||||||
func calculateBalancedResourceAllocation(pod *api.Pod, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
|
func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *resources, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
|
||||||
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
|
|
||||||
totalMemory := nodeInfo.NonZeroRequest().Memory
|
|
||||||
score := int(0)
|
|
||||||
// Add the resources requested by the current pod being scheduled.
|
|
||||||
// This also helps differentiate between differently sized, but empty, nodes.
|
|
||||||
for i := range pod.Spec.Containers {
|
|
||||||
container := &pod.Spec.Containers[i]
|
|
||||||
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
|
|
||||||
totalMilliCPU += cpu
|
|
||||||
totalMemory += memory
|
|
||||||
}
|
|
||||||
|
|
||||||
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
|
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
|
||||||
capacityMemory := node.Status.Allocatable.Memory().Value()
|
capacityMemory := node.Status.Allocatable.Memory().Value()
|
||||||
|
|
||||||
cpuFraction := fractionOfCapacity(totalMilliCPU, capacityMilliCPU)
|
totalResources := *podRequests
|
||||||
memoryFraction := fractionOfCapacity(totalMemory, capacityMemory)
|
totalResources.millicpu += nodeInfo.NonZeroRequest().MilliCPU
|
||||||
|
totalResources.memory += nodeInfo.NonZeroRequest().Memory
|
||||||
|
|
||||||
|
cpuFraction := fractionOfCapacity(totalResources.millicpu, capacityMilliCPU)
|
||||||
|
memoryFraction := fractionOfCapacity(totalResources.memory, capacityMemory)
|
||||||
|
score := int(0)
|
||||||
if cpuFraction >= 1 || memoryFraction >= 1 {
|
if cpuFraction >= 1 || memoryFraction >= 1 {
|
||||||
// if requested >= capacity, the corresponding host should never be preferrred.
|
// if requested >= capacity, the corresponding host should never be preferrred.
|
||||||
score = 0
|
score = 0
|
||||||
@ -257,13 +265,17 @@ func calculateBalancedResourceAllocation(pod *api.Pod, node *api.Node, nodeInfo
|
|||||||
diff := math.Abs(cpuFraction - memoryFraction)
|
diff := math.Abs(cpuFraction - memoryFraction)
|
||||||
score = int(10 - diff*10)
|
score = int(10 - diff*10)
|
||||||
}
|
}
|
||||||
glog.V(10).Infof(
|
if glog.V(10) {
|
||||||
"%v -> %v: Balanced Resource Allocation, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d",
|
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
|
||||||
pod.Name, node.Name,
|
// not logged. There is visible performance gain from it.
|
||||||
capacityMilliCPU, capacityMemory,
|
glog.V(10).Infof(
|
||||||
totalMilliCPU, totalMemory,
|
"%v -> %v: Balanced Resource Allocation, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d",
|
||||||
score,
|
pod.Name, node.Name,
|
||||||
)
|
capacityMilliCPU, capacityMemory,
|
||||||
|
totalResources.millicpu, totalResources.memory,
|
||||||
|
score,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
return schedulerapi.HostPriority{
|
return schedulerapi.HostPriority{
|
||||||
Host: node.Name,
|
Host: node.Name,
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
utilnode "k8s.io/kubernetes/pkg/util/node"
|
utilnode "k8s.io/kubernetes/pkg/util/node"
|
||||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||||
@ -32,7 +32,7 @@ import (
|
|||||||
|
|
||||||
// The maximum priority value to give to a node
|
// The maximum priority value to give to a node
|
||||||
// Prioritiy values range from 0-maxPriority
|
// Prioritiy values range from 0-maxPriority
|
||||||
const maxPriority = 10
|
const maxPriority float32 = 10
|
||||||
|
|
||||||
// When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading
|
// When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading
|
||||||
// TODO: Any way to justify this weighting?
|
// TODO: Any way to justify this weighting?
|
||||||
@ -62,21 +62,18 @@ func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algo
|
|||||||
// pods which match the same service selectors or RC selectors as the pod being scheduled.
|
// pods which match the same service selectors or RC selectors as the pod being scheduled.
|
||||||
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
|
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
|
||||||
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
|
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
|
||||||
selectors := make([]labels.Selector, 0)
|
selectors := make([]labels.Selector, 0, 3)
|
||||||
services, err := s.serviceLister.GetPodServices(pod)
|
if services, err := s.serviceLister.GetPodServices(pod); err == nil {
|
||||||
if err == nil {
|
|
||||||
for _, service := range services {
|
for _, service := range services {
|
||||||
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
|
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rcs, err := s.controllerLister.GetPodControllers(pod)
|
if rcs, err := s.controllerLister.GetPodControllers(pod); err == nil {
|
||||||
if err == nil {
|
|
||||||
for _, rc := range rcs {
|
for _, rc := range rcs {
|
||||||
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
|
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rss, err := s.replicaSetLister.GetPodReplicaSets(pod)
|
if rss, err := s.replicaSetLister.GetPodReplicaSets(pod); err == nil {
|
||||||
if err == nil {
|
|
||||||
for _, rs := range rss {
|
for _, rs := range rss {
|
||||||
if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
|
if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
|
||||||
selectors = append(selectors, selector)
|
selectors = append(selectors, selector)
|
||||||
@ -90,96 +87,57 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Count similar pods by node
|
// Count similar pods by node
|
||||||
countsByNodeName := map[string]int{}
|
countsByNodeName := make(map[string]float32, len(nodes))
|
||||||
|
countsByZone := make(map[string]float32, 10)
|
||||||
|
maxCountByNodeName := float32(0)
|
||||||
countsByNodeNameLock := sync.Mutex{}
|
countsByNodeNameLock := sync.Mutex{}
|
||||||
|
|
||||||
if len(selectors) > 0 {
|
if len(selectors) > 0 {
|
||||||
// Create a number of go-routines that will be computing number
|
processNodeFunc := func(i int) {
|
||||||
// of "similar" pods for given nodes.
|
nodeName := nodes[i].Name
|
||||||
workers := 16
|
count := float32(0)
|
||||||
toProcess := make(chan string, len(nodes))
|
for _, nodePod := range nodeNameToInfo[nodeName].Pods() {
|
||||||
for i := range nodes {
|
if pod.Namespace != nodePod.Namespace {
|
||||||
toProcess <- nodes[i].Name
|
continue
|
||||||
}
|
|
||||||
close(toProcess)
|
|
||||||
|
|
||||||
// TODO: Use Parallelize.
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(workers)
|
|
||||||
for i := 0; i < workers; i++ {
|
|
||||||
go func() {
|
|
||||||
defer utilruntime.HandleCrash()
|
|
||||||
defer wg.Done()
|
|
||||||
for {
|
|
||||||
nodeName, ok := <-toProcess
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
count := 0
|
|
||||||
for _, nodePod := range nodeNameToInfo[nodeName].Pods() {
|
|
||||||
if pod.Namespace != nodePod.Namespace {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// When we are replacing a failed pod, we often see the previous
|
|
||||||
// deleted version while scheduling the replacement.
|
|
||||||
// Ignore the previous deleted version for spreading purposes
|
|
||||||
// (it can still be considered for resource restrictions etc.)
|
|
||||||
if nodePod.DeletionTimestamp != nil {
|
|
||||||
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
matches := false
|
|
||||||
for _, selector := range selectors {
|
|
||||||
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
|
|
||||||
matches = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if matches {
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func() {
|
|
||||||
countsByNodeNameLock.Lock()
|
|
||||||
defer countsByNodeNameLock.Unlock()
|
|
||||||
countsByNodeName[nodeName] = count
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
}()
|
// When we are replacing a failed pod, we often see the previous
|
||||||
}
|
// deleted version while scheduling the replacement.
|
||||||
wg.Wait()
|
// Ignore the previous deleted version for spreading purposes
|
||||||
}
|
// (it can still be considered for resource restrictions etc.)
|
||||||
|
if nodePod.DeletionTimestamp != nil {
|
||||||
|
glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
matches := false
|
||||||
|
for _, selector := range selectors {
|
||||||
|
if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) {
|
||||||
|
matches = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if matches {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
zoneId := utilnode.GetZoneKey(nodes[i])
|
||||||
|
|
||||||
// Aggregate by-node information
|
countsByNodeNameLock.Lock()
|
||||||
// Compute the maximum number of pods hosted on any node
|
defer countsByNodeNameLock.Unlock()
|
||||||
maxCountByNodeName := 0
|
countsByNodeName[nodeName] = count
|
||||||
for _, count := range countsByNodeName {
|
if count > maxCountByNodeName {
|
||||||
if count > maxCountByNodeName {
|
maxCountByNodeName = count
|
||||||
maxCountByNodeName = count
|
}
|
||||||
|
if zoneId != "" {
|
||||||
|
countsByZone[zoneId] += count
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
workqueue.Parallelize(16, len(nodes), processNodeFunc)
|
||||||
|
|
||||||
// Count similar pods by zone, if zone information is present
|
|
||||||
countsByZone := map[string]int{}
|
|
||||||
for _, node := range nodes {
|
|
||||||
count, found := countsByNodeName[node.Name]
|
|
||||||
if !found {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
zoneId := utilnode.GetZoneKey(node)
|
|
||||||
if zoneId == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
countsByZone[zoneId] += count
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aggregate by-zone information
|
// Aggregate by-zone information
|
||||||
// Compute the maximum number of pods hosted in any zone
|
// Compute the maximum number of pods hosted in any zone
|
||||||
haveZones := len(countsByZone) != 0
|
haveZones := len(countsByZone) != 0
|
||||||
maxCountByZone := 0
|
maxCountByZone := float32(0)
|
||||||
for _, count := range countsByZone {
|
for _, count := range countsByZone {
|
||||||
if count > maxCountByZone {
|
if count > maxCountByZone {
|
||||||
maxCountByZone = count
|
maxCountByZone = count
|
||||||
@ -191,24 +149,28 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
|
|||||||
// 0 being the lowest priority and maxPriority being the highest
|
// 0 being the lowest priority and maxPriority being the highest
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
// initializing to the default/max node score of maxPriority
|
// initializing to the default/max node score of maxPriority
|
||||||
fScore := float32(maxPriority)
|
fScore := maxPriority
|
||||||
if maxCountByNodeName > 0 {
|
if maxCountByNodeName > 0 {
|
||||||
fScore = maxPriority * (float32(maxCountByNodeName-countsByNodeName[node.Name]) / float32(maxCountByNodeName))
|
fScore = maxPriority * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there is zone information present, incorporate it
|
// If there is zone information present, incorporate it
|
||||||
if haveZones {
|
if haveZones {
|
||||||
zoneId := utilnode.GetZoneKey(node)
|
zoneId := utilnode.GetZoneKey(node)
|
||||||
if zoneId != "" {
|
if zoneId != "" {
|
||||||
zoneScore := maxPriority * (float32(maxCountByZone-countsByZone[zoneId]) / float32(maxCountByZone))
|
zoneScore := maxPriority * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone)
|
||||||
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
|
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
|
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
|
||||||
glog.V(10).Infof(
|
if glog.V(10) {
|
||||||
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore),
|
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
|
||||||
)
|
// not logged. There is visible performance gain from it.
|
||||||
|
glog.V(10).Infof(
|
||||||
|
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -234,8 +196,7 @@ func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister
|
|||||||
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
|
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
|
||||||
var nsServicePods []*api.Pod
|
var nsServicePods []*api.Pod
|
||||||
|
|
||||||
services, err := s.serviceLister.GetPodServices(pod)
|
if services, err := s.serviceLister.GetPodServices(pod); err == nil {
|
||||||
if err == nil {
|
|
||||||
// just use the first service and get the other pods within the service
|
// just use the first service and get the other pods within the service
|
||||||
// TODO: a separate predicate can be created that tries to handle all services for the pod
|
// TODO: a separate predicate can be created that tries to handle all services for the pod
|
||||||
selector := labels.SelectorFromSet(services[0].Spec.Selector)
|
selector := labels.SelectorFromSet(services[0].Spec.Selector)
|
||||||
|
Loading…
Reference in New Issue
Block a user