diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 61438fe262e..728617d4ff2 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -69,6 +69,7 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) { type predicateMetadata struct { podBestEffort bool podRequest *resourceRequest + podPorts map[int]bool } func PredicateMetadata(pod *api.Pod) interface{} { @@ -79,6 +80,7 @@ func PredicateMetadata(pod *api.Pod) interface{} { return &predicateMetadata{ podBestEffort: isPodBestEffort(pod), podRequest: getResourceRequest(pod), + podPorts: getUsedPorts(pod), } } @@ -479,8 +481,7 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N } var podRequest *resourceRequest - predicateMeta, ok := meta.(*predicateMetadata) - if ok { + if predicateMeta, ok := meta.(*predicateMetadata); ok { podRequest = predicateMeta.podRequest } else { // We couldn't parse metadata - fallback to computing it. @@ -751,16 +752,21 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n } func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { - wantPorts := getUsedPorts(pod) + var wantPorts map[int]bool + if predicateMeta, ok := meta.(*predicateMetadata); ok { + wantPorts = predicateMeta.podPorts + } else { + // We couldn't parse metadata - fallback to computing it. + wantPorts = getUsedPorts(pod) + } if len(wantPorts) == 0 { return true, nil } + + // TODO: Aggregate it at the NodeInfo level. existingPorts := getUsedPorts(nodeInfo.Pods()...) for wport := range wantPorts { - if wport == 0 { - continue - } - if existingPorts[wport] { + if wport != 0 && existingPorts[wport] { return false, ErrPodNotFitsHostPorts } } @@ -768,7 +774,6 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N } func getUsedPorts(pods ...*api.Pod) map[int]bool { - // TODO: Aggregate it at the NodeInfo level. ports := make(map[int]bool) for _, pod := range pods { for j := range pod.Spec.Containers { diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index 7c29d2569ab..6c2a90b19a8 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -28,9 +28,25 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) +type resources struct { + millicpu int64 + memory int64 +} + +func getNonZeroRequests(pod *api.Pod) *resources { + result := &resources{} + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests) + result.millicpu += cpu + result.memory += memory + } + return result +} + // the unused capacity is calculated on a scale of 0-10 // 0 being the lowest priority and 10 being the highest -func calculateScore(requested int64, capacity int64, node string) int { +func calculateScore(requested int64, capacity int64, node string) int64 { if capacity == 0 { return 0 } @@ -39,36 +55,33 @@ func calculateScore(requested int64, capacity int64, node string) int { requested, capacity, node) return 0 } - return int(((capacity - requested) * 10) / capacity) + return ((capacity - requested) * 10) / capacity } // Calculate the resource occupancy on a node. 'node' has information about the resources on the node. // 'pods' is a list of pods currently scheduled on the node. // TODO: Use Node() from nodeInfo instead of passing it. -func calculateResourceOccupancy(pod *api.Pod, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { - totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU - totalMemory := nodeInfo.NonZeroRequest().Memory +func calculateResourceOccupancy(pod *api.Pod, podRequests *resources, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue() capacityMemory := node.Status.Allocatable.Memory().Value() - // Add the resources requested by the current pod being scheduled. - // This also helps differentiate between differently sized, but empty, nodes. - for i := range pod.Spec.Containers { - container := &pod.Spec.Containers[i] - cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests) - totalMilliCPU += cpu - totalMemory += memory - } + totalResources := *podRequests + totalResources.millicpu += nodeInfo.NonZeroRequest().MilliCPU + totalResources.memory += nodeInfo.NonZeroRequest().Memory - cpuScore := calculateScore(totalMilliCPU, capacityMilliCPU, node.Name) - memoryScore := calculateScore(totalMemory, capacityMemory, node.Name) - glog.V(10).Infof( - "%v -> %v: Least Requested Priority, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d CPU %d memory", - pod.Name, node.Name, - capacityMilliCPU, capacityMemory, - totalMilliCPU, totalMemory, - cpuScore, memoryScore, - ) + cpuScore := calculateScore(totalResources.millicpu, capacityMilliCPU, node.Name) + memoryScore := calculateScore(totalResources.memory, capacityMemory, node.Name) + if glog.V(10) { + // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + glog.V(10).Infof( + "%v -> %v: Least Requested Priority, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d CPU %d memory", + pod.Name, node.Name, + capacityMilliCPU, capacityMemory, + totalResources.millicpu, totalResources.memory, + cpuScore, memoryScore, + ) + } return schedulerapi.HostPriority{ Host: node.Name, @@ -86,9 +99,10 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca return schedulerapi.HostPriorityList{}, err } + podResources := getNonZeroRequests(pod) list := make(schedulerapi.HostPriorityList, 0, len(nodes)) for _, node := range nodes { - list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name])) + list = append(list, calculateResourceOccupancy(pod, podResources, node, nodeNameToInfo[node.Name])) } return list, nil } @@ -220,32 +234,26 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul return schedulerapi.HostPriorityList{}, err } + podResources := getNonZeroRequests(pod) list := make(schedulerapi.HostPriorityList, 0, len(nodes)) for _, node := range nodes { - list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name])) + list = append(list, calculateBalancedResourceAllocation(pod, podResources, node, nodeNameToInfo[node.Name])) } return list, nil } // TODO: Use Node() from nodeInfo instead of passing it. -func calculateBalancedResourceAllocation(pod *api.Pod, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { - totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU - totalMemory := nodeInfo.NonZeroRequest().Memory - score := int(0) - // Add the resources requested by the current pod being scheduled. - // This also helps differentiate between differently sized, but empty, nodes. - for i := range pod.Spec.Containers { - container := &pod.Spec.Containers[i] - cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests) - totalMilliCPU += cpu - totalMemory += memory - } - +func calculateBalancedResourceAllocation(pod *api.Pod, podRequests *resources, node *api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority { capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue() capacityMemory := node.Status.Allocatable.Memory().Value() - cpuFraction := fractionOfCapacity(totalMilliCPU, capacityMilliCPU) - memoryFraction := fractionOfCapacity(totalMemory, capacityMemory) + totalResources := *podRequests + totalResources.millicpu += nodeInfo.NonZeroRequest().MilliCPU + totalResources.memory += nodeInfo.NonZeroRequest().Memory + + cpuFraction := fractionOfCapacity(totalResources.millicpu, capacityMilliCPU) + memoryFraction := fractionOfCapacity(totalResources.memory, capacityMemory) + score := int(0) if cpuFraction >= 1 || memoryFraction >= 1 { // if requested >= capacity, the corresponding host should never be preferrred. score = 0 @@ -257,13 +265,17 @@ func calculateBalancedResourceAllocation(pod *api.Pod, node *api.Node, nodeInfo diff := math.Abs(cpuFraction - memoryFraction) score = int(10 - diff*10) } - glog.V(10).Infof( - "%v -> %v: Balanced Resource Allocation, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d", - pod.Name, node.Name, - capacityMilliCPU, capacityMemory, - totalMilliCPU, totalMemory, - score, - ) + if glog.V(10) { + // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + glog.V(10).Infof( + "%v -> %v: Balanced Resource Allocation, capacity %d millicores %d memory bytes, total request %d millicores %d memory bytes, score %d", + pod.Name, node.Name, + capacityMilliCPU, capacityMemory, + totalResources.millicpu, totalResources.memory, + score, + ) + } return schedulerapi.HostPriority{ Host: node.Name, diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index ddcc05198ae..a8c1ce6b71d 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -24,7 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/labels" utilnode "k8s.io/kubernetes/pkg/util/node" - utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -32,7 +32,7 @@ import ( // The maximum priority value to give to a node // Prioritiy values range from 0-maxPriority -const maxPriority = 10 +const maxPriority float32 = 10 // When zone information is present, give 2/3 of the weighting to zone spreading, 1/3 to node spreading // TODO: Any way to justify this weighting? @@ -62,21 +62,18 @@ func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algo // pods which match the same service selectors or RC selectors as the pod being scheduled. // Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods. func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { - selectors := make([]labels.Selector, 0) - services, err := s.serviceLister.GetPodServices(pod) - if err == nil { + selectors := make([]labels.Selector, 0, 3) + if services, err := s.serviceLister.GetPodServices(pod); err == nil { for _, service := range services { selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) } } - rcs, err := s.controllerLister.GetPodControllers(pod) - if err == nil { + if rcs, err := s.controllerLister.GetPodControllers(pod); err == nil { for _, rc := range rcs { selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) } } - rss, err := s.replicaSetLister.GetPodReplicaSets(pod) - if err == nil { + if rss, err := s.replicaSetLister.GetPodReplicaSets(pod); err == nil { for _, rs := range rss { if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { selectors = append(selectors, selector) @@ -90,96 +87,57 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma } // Count similar pods by node - countsByNodeName := map[string]int{} + countsByNodeName := make(map[string]float32, len(nodes)) + countsByZone := make(map[string]float32, 10) + maxCountByNodeName := float32(0) countsByNodeNameLock := sync.Mutex{} if len(selectors) > 0 { - // Create a number of go-routines that will be computing number - // of "similar" pods for given nodes. - workers := 16 - toProcess := make(chan string, len(nodes)) - for i := range nodes { - toProcess <- nodes[i].Name - } - close(toProcess) - - // TODO: Use Parallelize. - wg := sync.WaitGroup{} - wg.Add(workers) - for i := 0; i < workers; i++ { - go func() { - defer utilruntime.HandleCrash() - defer wg.Done() - for { - nodeName, ok := <-toProcess - if !ok { - return - } - count := 0 - for _, nodePod := range nodeNameToInfo[nodeName].Pods() { - if pod.Namespace != nodePod.Namespace { - continue - } - // When we are replacing a failed pod, we often see the previous - // deleted version while scheduling the replacement. - // Ignore the previous deleted version for spreading purposes - // (it can still be considered for resource restrictions etc.) - if nodePod.DeletionTimestamp != nil { - glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) - continue - } - matches := false - for _, selector := range selectors { - if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { - matches = true - break - } - } - if matches { - count++ - } - } - - func() { - countsByNodeNameLock.Lock() - defer countsByNodeNameLock.Unlock() - countsByNodeName[nodeName] = count - }() + processNodeFunc := func(i int) { + nodeName := nodes[i].Name + count := float32(0) + for _, nodePod := range nodeNameToInfo[nodeName].Pods() { + if pod.Namespace != nodePod.Namespace { + continue } - }() - } - wg.Wait() - } + // When we are replacing a failed pod, we often see the previous + // deleted version while scheduling the replacement. + // Ignore the previous deleted version for spreading purposes + // (it can still be considered for resource restrictions etc.) + if nodePod.DeletionTimestamp != nil { + glog.V(4).Infof("skipping pending-deleted pod: %s/%s", nodePod.Namespace, nodePod.Name) + continue + } + matches := false + for _, selector := range selectors { + if selector.Matches(labels.Set(nodePod.ObjectMeta.Labels)) { + matches = true + break + } + } + if matches { + count++ + } + } + zoneId := utilnode.GetZoneKey(nodes[i]) - // Aggregate by-node information - // Compute the maximum number of pods hosted on any node - maxCountByNodeName := 0 - for _, count := range countsByNodeName { - if count > maxCountByNodeName { - maxCountByNodeName = count + countsByNodeNameLock.Lock() + defer countsByNodeNameLock.Unlock() + countsByNodeName[nodeName] = count + if count > maxCountByNodeName { + maxCountByNodeName = count + } + if zoneId != "" { + countsByZone[zoneId] += count + } } - } - - // Count similar pods by zone, if zone information is present - countsByZone := map[string]int{} - for _, node := range nodes { - count, found := countsByNodeName[node.Name] - if !found { - continue - } - - zoneId := utilnode.GetZoneKey(node) - if zoneId == "" { - continue - } - - countsByZone[zoneId] += count + workqueue.Parallelize(16, len(nodes), processNodeFunc) } // Aggregate by-zone information // Compute the maximum number of pods hosted in any zone haveZones := len(countsByZone) != 0 - maxCountByZone := 0 + maxCountByZone := float32(0) for _, count := range countsByZone { if count > maxCountByZone { maxCountByZone = count @@ -191,24 +149,28 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma // 0 being the lowest priority and maxPriority being the highest for _, node := range nodes { // initializing to the default/max node score of maxPriority - fScore := float32(maxPriority) + fScore := maxPriority if maxCountByNodeName > 0 { - fScore = maxPriority * (float32(maxCountByNodeName-countsByNodeName[node.Name]) / float32(maxCountByNodeName)) + fScore = maxPriority * ((maxCountByNodeName - countsByNodeName[node.Name]) / maxCountByNodeName) } // If there is zone information present, incorporate it if haveZones { zoneId := utilnode.GetZoneKey(node) if zoneId != "" { - zoneScore := maxPriority * (float32(maxCountByZone-countsByZone[zoneId]) / float32(maxCountByZone)) + zoneScore := maxPriority * ((maxCountByZone - countsByZone[zoneId]) / maxCountByZone) fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore) } } result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) - glog.V(10).Infof( - "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), - ) + if glog.V(10) { + // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + glog.V(10).Infof( + "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), + ) + } } return result, nil } @@ -234,8 +196,7 @@ func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var nsServicePods []*api.Pod - services, err := s.serviceLister.GetPodServices(pod) - if err == nil { + if services, err := s.serviceLister.GetPodServices(pod); err == nil { // just use the first service and get the other pods within the service // TODO: a separate predicate can be created that tries to handle all services for the pod selector := labels.SelectorFromSet(services[0].Spec.Selector)