Optimize selector spreading

This commit is contained in:
Wojciech Tyczynski 2016-07-12 11:32:34 +02:00
parent b9d13c5dbd
commit ae6b66207a

View File

@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
utilnode "k8s.io/kubernetes/pkg/util/node" utilnode "k8s.io/kubernetes/pkg/util/node"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -32,7 +32,7 @@ import (
// The maximum priority value to give to a node // The maximum priority value to give to a node
// Prioritiy values range from 0-maxPriority // Prioritiy values range from 0-maxPriority
const maxPriority = 10 const maxPriority float32 = 10
// When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading // When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading
// TODO: Any way to justify this weighting? // TODO: Any way to justify this weighting?
@ -62,21 +62,18 @@ func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algo
// pods which match the same service selectors or RC selectors as the pod being scheduled. // pods which match the same service selectors or RC selectors as the pod being scheduled.
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. // Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
selectors := make([]labels.Selector, 0) selectors := make([]labels.Selector, 0, 3)
services, err := s.serviceLister.GetPodServices(pod) if services, err := s.serviceLister.GetPodServices(pod); err == nil {
if err == nil {
for _, service := range services { for _, service := range services {
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
} }
} }
rcs, err := s.controllerLister.GetPodControllers(pod) if rcs, err := s.controllerLister.GetPodControllers(pod); err == nil {
if err == nil {
for _, rc := range rcs { for _, rc := range rcs {
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
} }
} }
rss, err := s.replicaSetLister.GetPodReplicaSets(pod) if rss, err := s.replicaSetLister.GetPodReplicaSets(pod); err == nil {
if err == nil {
for _, rs := range rss { for _, rs := range rss {
if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
selectors = append(selectors, selector) selectors = append(selectors, selector)
@ -90,32 +87,15 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
} }
// Count similar pods by node // Count similar pods by node
countsByNodeName := map[string]int{} countsByNodeName := make(map[string]float32, len(nodes))
countsByZone := make(map[string]float32, 10)
maxCountByNodeName := float32(0)
countsByNodeNameLock := sync.Mutex{} countsByNodeNameLock := sync.Mutex{}
if len(selectors) > 0 { if len(selectors) > 0 {
// Create a number of go-routines that will be computing number processNodeFunc := func(i int) {
// of "similar" pods for given nodes. nodeName := nodes[i].Name
workers := 16 count := float32(0)
toProcess := make(chan string, len(nodes))
for i := range nodes {
toProcess <- nodes[i].Name
}
close(toProcess)
// TODO: Use Parallelize.
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for {
nodeName, ok := <-toProcess
if !ok {
return
}
count := 0
for _, nodePod := range nodeNameToInfo[nodeName].Pods() { for _, nodePod := range nodeNameToInfo[nodeName].Pods() {
if pod.Namespace != nodePod.Namespace { if pod.Namespace != nodePod.Namespace {
continue continue
@ -139,47 +119,25 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
count++ count++
} }
} }
zoneId := utilnode.GetZoneKey(nodes[i])
func() {
countsByNodeNameLock.Lock() countsByNodeNameLock.Lock()
defer countsByNodeNameLock.Unlock() defer countsByNodeNameLock.Unlock()
countsByNodeName[nodeName] = count countsByNodeName[nodeName] = count
}()
}
}()
}
wg.Wait()
}
// Aggregate by-node information
// Compute the maximum number of pods hosted on any node
maxCountByNodeName := 0
for _, count := range countsByNodeName {
if count > maxCountByNodeName { if count > maxCountByNodeName {
maxCountByNodeName = count maxCountByNodeName = count
} }
} if zoneId != "" {
// Count similar pods by zone, if zone information is present
countsByZone := map[string]int{}
for _, node := range nodes {
count, found := countsByNodeName[node.Name]
if !found {
continue
}
zoneId := utilnode.GetZoneKey(node)
if zoneId == "" {
continue
}
countsByZone[zoneId] += count countsByZone[zoneId] += count
} }
}
workqueue.Parallelize(16, len(nodes), processNodeFunc)
}
// Aggregate by-zone information // Aggregate by-zone information
// Compute the maximum number of pods hosted in any zone // Compute the maximum number of pods hosted in any zone
haveZones := len(countsByZone) != 0 haveZones := len(countsByZone) != 0
maxCountByZone := 0 maxCountByZone := float32(0)
for _, count := range countsByZone { for _, count := range countsByZone {
if count > maxCountByZone { if count > maxCountByZone {
maxCountByZone = count maxCountByZone = count
@ -191,25 +149,29 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
// 0 being the lowest priority and maxPriority being the highest // 0 being the lowest priority and maxPriority being the highest
for _, node := range nodes { for _, node := range nodes {
// initializing to the default/max node score of maxPriority // initializing to the default/max node score of maxPriority
fScore := float32(maxPriority) fScore := maxPriority
if maxCountByNodeName > 0 { if maxCountByNodeName > 0 {
fScore = maxPriority * (float32(maxCountByNodeName-countsByNodeName[node.Name]) / float32(maxCountByNodeName)) fScore = maxPriority * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName)
} }
// If there is zone information present, incorporate it // If there is zone information present, incorporate it
if haveZones { if haveZones {
zoneId := utilnode.GetZoneKey(node) zoneId := utilnode.GetZoneKey(node)
if zoneId != "" { if zoneId != "" {
zoneScore := maxPriority * (float32(maxCountByZone-countsByZone[zoneId]) / float32(maxCountByZone)) zoneScore := maxPriority * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone)
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
} }
} }
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.V(10).Infof( glog.V(10).Infof(
"%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore),
) )
} }
}
return result, nil return result, nil
} }
@ -234,8 +196,7 @@ func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var nsServicePods []*api.Pod var nsServicePods []*api.Pod
services, err := s.serviceLister.GetPodServices(pod) if services, err := s.serviceLister.GetPodServices(pod); err == nil {
if err == nil {
// just use the first service and get the other pods within the service // just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod // TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector) selector := labels.SelectorFromSet(services[0].Spec.Selector)