Merge pull request #81068 from Huang-Wei/eps-structure-optimize

scheduler: internal data structure optimization
This commit is contained in:
Kubernetes Prow Robot 2019-08-20 20:39:58 -07:00 committed by GitHub
commit e7ecb22403
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 579 additions and 651 deletions

View File

@ -39,7 +39,7 @@ import (
type PredicateMetadata interface {
ShallowCopy() PredicateMetadata
AddPod(addedPod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) error
RemovePod(deletedPod *v1.Pod) error
RemovePod(deletedPod *v1.Pod, node *v1.Node) error
}
// PredicateMetadataProducer is a function that computes predicate metadata for a given pod.
@ -67,17 +67,67 @@ type topologyPairsMaps struct {
podToTopologyPairs map[string]topologyPairSet
}
// topologyPairsPodSpreadMap combines topologyKeyToMinPodsMap and topologyPairsMaps
type criticalPath struct {
// topologyValue denotes the topology value mapping to topology key.
topologyValue string
// matchNum denotes the number of matching pods.
matchNum int32
}
// CAVEAT: the reason that `[2]criticalPath` can work is based on the implementation of current
// preemption algorithm, in particular the following 2 facts:
// Fact 1: we only preempt pods on the same node, instead of pods on multiple nodes.
// Fact 2: each node is evaluated on a separate copy of the metadata during its preemption cycle.
// If we plan to turn to a more complex algorithm like "arbitrary pods on multiple nodes", this
// structure needs to be revisited.
type criticalPaths [2]criticalPath
func newCriticalPaths() *criticalPaths {
return &criticalPaths{{matchNum: math.MaxInt32}, {matchNum: math.MaxInt32}}
}
func (paths *criticalPaths) update(tpVal string, num int32) {
// first verify if `tpVal` exists or not
i := -1
if tpVal == paths[0].topologyValue {
i = 0
} else if tpVal == paths[1].topologyValue {
i = 1
}
if i >= 0 {
// `tpVal` exists
paths[i].matchNum = num
if paths[0].matchNum > paths[1].matchNum {
// swap paths[0] and paths[1]
paths[0], paths[1] = paths[1], paths[0]
}
} else {
// `tpVal` doesn't exist
if num < paths[0].matchNum {
// update paths[1] with paths[0]
paths[1] = paths[0]
// update paths[0]
paths[0].topologyValue, paths[0].matchNum = tpVal, num
} else if num < paths[1].matchNum {
// update paths[1]
paths[1].topologyValue, paths[1].matchNum = tpVal, num
}
}
}
// podSpreadCache combines tpKeyToCriticalPaths and tpPairToMatchNum
// to represent:
// (1) minimum number of pods matched on the spread constraints.
// (2) how existing pods match incoming pod on its spread constraints.
type topologyPairsPodSpreadMap struct {
// This map is keyed with a topology key, and valued with minimum number
// of pods matched on that topology domain.
// TODO(Huang-Wei): refactor to {tpKey->tpValSet(or tpValSlice)}
topologyKeyToMinPodsMap map[string]int32
// TODO(Huang-Wei): refactor to {tpPair->count, podName->tpPairSet(optional)}
*topologyPairsMaps
// (1) critical paths where the least pods are matched on each spread constraint.
// (2) number of pods matched on each spread constraint.
type podSpreadCache struct {
// We record 2 critical paths instead of all critical paths here.
// criticalPaths[0].matchNum always holds the minimum matching number.
// criticalPaths[1].matchNum is always greater or equal to criticalPaths[0].matchNum, but
// it's not guaranteed to be the 2nd minimum match number.
tpKeyToCriticalPaths map[string]*criticalPaths
// tpPairToMatchNum is keyed with topologyPair, and valued with the number of matching pods.
tpPairToMatchNum map[topologyPair]int32
}
// NOTE: When new fields are added/removed or logic is changed, please make sure that
@ -105,9 +155,9 @@ type predicateMetadata struct {
// which should be accounted only by the extenders. This set is synthesized
// from scheduler extender configuration and does not change per pod.
ignoredExtendedResources sets.String
// Similar to the map for pod (anti-)affinity, but imposes additional min matches info
// to describe minimum match number on each topology spread constraint.
topologyPairsPodSpreadMap *topologyPairsPodSpreadMap
// podSpreadCache holds info of the minimum match number on each topology spread constraint,
// and the match number of all valid topology pairs.
podSpreadCache *podSpreadCache
}
// Ensure that predicateMetadata implements algorithm.PredicateMetadata.
@ -154,9 +204,9 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
if pod == nil {
return nil
}
// existingPodSpreadConstraintsMap represents how existing pods match "pod"
// existingPodSpreadCache represents how existing pods match "pod"
// on its spread constraints
existingPodSpreadConstraintsMap, err := getTPMapMatchingSpreadConstraints(pod, nodeNameToInfoMap)
existingPodSpreadCache, err := getExistingPodSpreadCache(pod, nodeNameToInfoMap)
if err != nil {
klog.Errorf("Error calculating spreadConstraintsMap: %v", err)
return nil
@ -182,7 +232,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
topologyPairsPotentialAffinityPods: incomingPodAffinityMap,
topologyPairsPotentialAntiAffinityPods: incomingPodAntiAffinityMap,
topologyPairsAntiAffinityPodsMap: existingPodAntiAffinityMap,
topologyPairsPodSpreadMap: existingPodSpreadConstraintsMap,
podSpreadCache: existingPodSpreadCache,
}
for predicateName, precomputeFunc := range predicateMetadataProducers {
klog.V(10).Infof("Precompute: %v", predicateName)
@ -191,7 +241,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
return predicateMetadata
}
func getTPMapMatchingSpreadConstraints(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*topologyPairsPodSpreadMap, error) {
func getExistingPodSpreadCache(pod *v1.Pod, nodeInfoMap map[string]*schedulernodeinfo.NodeInfo) (*podSpreadCache, error) {
// We have feature gating in APIServer to strip the spec
// so don't need to re-check feature gate, just check length of constraints.
constraints := getHardTopologySpreadConstraints(pod)
@ -207,14 +257,15 @@ func getTPMapMatchingSpreadConstraints(pod *v1.Pod, nodeInfoMap map[string]*sche
errCh := schedutil.NewErrorChannel()
var lock sync.Mutex
topologyPairsPodSpreadMap := &topologyPairsPodSpreadMap{
// topologyKeyToMinPodsMap will be initialized with proper size later.
topologyPairsMaps: newTopologyPairsMaps(),
// TODO(Huang-Wei): It might be possible to use "make(map[topologyPair]*int32)".
// In that case, need to consider how to init each tpPairToCount[pair] in an atomic fashion.
m := podSpreadCache{
tpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
tpPairToMatchNum: make(map[topologyPair]int32),
}
appendTopologyPairsMaps := func(toAppend *topologyPairsMaps) {
addTopologyPairMatchNum := func(pair topologyPair, num int32) {
lock.Lock()
topologyPairsPodSpreadMap.appendMaps(toAppend)
m.tpPairToMatchNum[pair] += num
lock.Unlock()
}
@ -237,9 +288,8 @@ func getTPMapMatchingSpreadConstraints(pod *v1.Pod, nodeInfoMap map[string]*sche
if !NodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
return
}
nodeTopologyMaps := newTopologyPairsMaps()
for _, constraint := range constraints {
pairAdded := false
matchTotal := int32(0)
// nodeInfo.Pods() can be empty; or all pods don't fit
for _, existingPod := range nodeInfo.Pods() {
if existingPod.Namespace != pod.Namespace {
@ -251,26 +301,12 @@ func getTPMapMatchingSpreadConstraints(pod *v1.Pod, nodeInfoMap map[string]*sche
return
}
if ok {
// constraint.TopologyKey is already guaranteed to be present
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
nodeTopologyMaps.addTopologyPair(pair, existingPod)
pairAdded = true
matchTotal++
}
}
// If needed, append topology pair without entry of pods.
// For example, on node-x, there is no pod matching spread constraints,
// but node-x should be also considered as a match (with match number 0)
// i.e. <node: node-x>: {}
if !pairAdded {
pair := topologyPair{
key: constraint.TopologyKey,
value: node.Labels[constraint.TopologyKey],
}
nodeTopologyMaps.addTopologyPairWithoutPods(pair)
}
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
addTopologyPairMatchNum(pair, matchTotal)
}
appendTopologyPairsMaps(nodeTopologyMaps)
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processNode)
@ -279,18 +315,15 @@ func getTPMapMatchingSpreadConstraints(pod *v1.Pod, nodeInfoMap map[string]*sche
}
// calculate min match for each topology pair
topologyPairsPodSpreadMap.topologyKeyToMinPodsMap = make(map[string]int32, len(constraints))
for _, constraint := range constraints {
topologyPairsPodSpreadMap.topologyKeyToMinPodsMap[constraint.TopologyKey] = math.MaxInt32
for i := 0; i < len(constraints); i++ {
key := constraints[i].TopologyKey
m.tpKeyToCriticalPaths[key] = newCriticalPaths()
}
for pair, podSet := range topologyPairsPodSpreadMap.topologyPairToPods {
// TODO(Huang-Wei): short circuit unvisited portions of <topologyKey: any value>
// if we already see 0 as min match of that topologyKey.
if l := int32(len(podSet)); l < topologyPairsPodSpreadMap.topologyKeyToMinPodsMap[pair.key] {
topologyPairsPodSpreadMap.topologyKeyToMinPodsMap[pair.key] = l
}
for pair, num := range m.tpPairToMatchNum {
m.tpKeyToCriticalPaths[pair.key].update(pair.value, num)
}
return topologyPairsPodSpreadMap, nil
return &m, nil
}
func getHardTopologySpreadConstraints(pod *v1.Pod) (constraints []v1.TopologySpreadConstraint) {
@ -337,7 +370,9 @@ func newTopologyPairsMaps() *topologyPairsMaps {
func (m *topologyPairsMaps) addTopologyPair(pair topologyPair, pod *v1.Pod) {
podFullName := schedutil.GetPodFullName(pod)
m.addTopologyPairWithoutPods(pair)
if m.topologyPairToPods[pair] == nil {
m.topologyPairToPods[pair] = make(map[*v1.Pod]struct{})
}
m.topologyPairToPods[pair][pod] = struct{}{}
if m.podToTopologyPairs[podFullName] == nil {
m.podToTopologyPairs[podFullName] = make(map[topologyPair]struct{})
@ -345,13 +380,6 @@ func (m *topologyPairsMaps) addTopologyPair(pair topologyPair, pod *v1.Pod) {
m.podToTopologyPairs[podFullName][pair] = struct{}{}
}
// add a topology pair holder if needed
func (m *topologyPairsMaps) addTopologyPairWithoutPods(pair topologyPair) {
if m.topologyPairToPods[pair] == nil {
m.topologyPairToPods[pair] = make(map[*v1.Pod]struct{})
}
}
func (m *topologyPairsMaps) removePod(deletedPod *v1.Pod) {
deletedPodFullName := schedutil.GetPodFullName(deletedPod)
for pair := range m.podToTopologyPairs[deletedPodFullName] {
@ -368,12 +396,8 @@ func (m *topologyPairsMaps) appendMaps(toAppend *topologyPairsMaps) {
return
}
for pair := range toAppend.topologyPairToPods {
if podSet := toAppend.topologyPairToPods[pair]; len(podSet) == 0 {
m.addTopologyPairWithoutPods(pair)
} else {
for pod := range podSet {
m.addTopologyPair(pair, pod)
}
for pod := range toAppend.topologyPairToPods[pair] {
m.addTopologyPair(pair, pod)
}
}
}
@ -384,8 +408,16 @@ func (m *topologyPairsMaps) clone() *topologyPairsMaps {
return copy
}
func (m *topologyPairsPodSpreadMap) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) error {
if addedPod.Namespace != preemptorPod.Namespace {
func (c *podSpreadCache) addPod(addedPod, preemptorPod *v1.Pod, node *v1.Node) error {
return c.updatePod(addedPod, preemptorPod, node, 1)
}
func (c *podSpreadCache) removePod(deletedPod, preemptorPod *v1.Pod, node *v1.Node) {
c.updatePod(deletedPod, preemptorPod, node, -1)
}
func (c *podSpreadCache) updatePod(updatedPod, preemptorPod *v1.Pod, node *v1.Node, delta int32) error {
if updatedPod.Namespace != preemptorPod.Namespace || node == nil {
return nil
}
constraints := getHardTopologySpreadConstraints(preemptorPod)
@ -393,98 +425,45 @@ func (m *topologyPairsPodSpreadMap) addPod(addedPod, preemptorPod *v1.Pod, node
return nil
}
// records which topology key(s) needs to be updated
minMatchNeedingUpdate := make(map[string]struct{})
podLabelSet := labels.Set(addedPod.Labels)
podLabelSet := labels.Set(updatedPod.Labels)
for _, constraint := range constraints {
if match, err := PodMatchesSpreadConstraint(podLabelSet, constraint); err != nil {
return err
} else if !match {
continue
}
pair := topologyPair{
key: constraint.TopologyKey,
value: node.Labels[constraint.TopologyKey],
}
// it means current node is one of the critical paths of topologyKeyToMinPodsMap[TopologyKey]
if int32(len(m.topologyPairToPods[pair])) == m.topologyKeyToMinPodsMap[pair.key] {
minMatchNeedingUpdate[pair.key] = struct{}{}
}
m.addTopologyPair(pair, addedPod)
}
// no need to addTopologyPairWithoutPods b/c if a pair without pods must be present,
// it should have already been created earlier in removePod() phase
// In most cases, min match map doesn't need to be updated.
// But it's required to be updated when current node is the ONLY critical path which impacts
// the min match. With that said, in this case min match needs to be updated to min match + 1
if len(minMatchNeedingUpdate) != 0 {
// TODO(Huang-Wei): performance can be optimized.
// A possible solution is to record number of critical paths which co-impact the min match.
tempMinMatchMap := make(map[string]int32, len(minMatchNeedingUpdate))
for key := range minMatchNeedingUpdate {
tempMinMatchMap[key] = math.MaxInt32
}
for pair, podSet := range m.topologyPairToPods {
if _, ok := minMatchNeedingUpdate[pair.key]; !ok {
continue
}
if l := int32(len(podSet)); l < tempMinMatchMap[pair.key] {
tempMinMatchMap[pair.key] = l
}
}
for key, tempMin := range tempMinMatchMap {
if tempMin == m.topologyKeyToMinPodsMap[key]+1 {
m.topologyKeyToMinPodsMap[key] = tempMin
}
}
k, v := constraint.TopologyKey, node.Labels[constraint.TopologyKey]
pair := topologyPair{key: k, value: v}
c.tpPairToMatchNum[pair] = c.tpPairToMatchNum[pair] + delta
c.tpKeyToCriticalPaths[k].update(v, c.tpPairToMatchNum[pair])
}
return nil
}
func (m *topologyPairsPodSpreadMap) removePod(deletedPod *v1.Pod) {
if m == nil || deletedPod == nil {
return
}
deletedPodFullName := schedutil.GetPodFullName(deletedPod)
pairSet, ok := m.podToTopologyPairs[deletedPodFullName]
if !ok {
return
}
topologyPairToPods := m.topologyPairToPods
for pair := range pairSet {
delete(topologyPairToPods[pair], deletedPod)
// if topologyPairToPods[pair] is empty after deletion
// don't clean it up as that topology counts as a match now
// removal of the deletedPod would probably genereate a smaller matching number
// so re-calculate minMatch to a smaller value if possible
if l := int32(len(topologyPairToPods[pair])); l < m.topologyKeyToMinPodsMap[pair.key] {
m.topologyKeyToMinPodsMap[pair.key] = l
}
}
delete(m.podToTopologyPairs, deletedPodFullName)
}
func (m *topologyPairsPodSpreadMap) clone() *topologyPairsPodSpreadMap {
// m could be nil when EvenPodsSpread feature is disabled
if m == nil {
func (c *podSpreadCache) clone() *podSpreadCache {
// c could be nil when EvenPodsSpread feature is disabled
if c == nil {
return nil
}
copy := &topologyPairsPodSpreadMap{
topologyKeyToMinPodsMap: make(map[string]int32),
topologyPairsMaps: m.topologyPairsMaps.clone(),
copy := podSpreadCache{
tpKeyToCriticalPaths: make(map[string]*criticalPaths),
tpPairToMatchNum: make(map[topologyPair]int32),
}
for key, minMatched := range m.topologyKeyToMinPodsMap {
copy.topologyKeyToMinPodsMap[key] = minMatched
for tpKey, paths := range c.tpKeyToCriticalPaths {
copy.tpKeyToCriticalPaths[tpKey] = &criticalPaths{paths[0], paths[1]}
}
return copy
for tpPair, matchNum := range c.tpPairToMatchNum {
copyPair := topologyPair{key: tpPair.key, value: tpPair.value}
copy.tpPairToMatchNum[copyPair] = matchNum
}
return &copy
}
// RemovePod changes predicateMetadata assuming that the given `deletedPod` is
// deleted from the system.
func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) error {
deletedPodFullName := schedutil.GetPodFullName(deletedPod)
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
return fmt.Errorf("deletedPod and meta.pod must not be the same")
@ -494,7 +473,7 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
meta.topologyPairsPotentialAffinityPods.removePod(deletedPod)
meta.topologyPairsPotentialAntiAffinityPods.removePod(deletedPod)
// Delete pod from the pod spread topology maps.
meta.topologyPairsPodSpreadMap.removePod(deletedPod)
meta.podSpreadCache.removePod(deletedPod, meta.pod, node)
// All pods in the serviceAffinityMatchingPodList are in the same namespace.
// So, if the namespace of the first one is not the same as the namespace of the
// deletedPod, we don't need to check the list, as deletedPod isn't in the list.
@ -556,9 +535,9 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulernodei
}
}
}
// Update meta.topologyPairsPodSpreadMap if meta.pod has hard spread constraints
// Update meta.podSpreadCache if meta.pod has hard spread constraints
// and addedPod matches that
if err := meta.topologyPairsPodSpreadMap.addPod(addedPod, meta.pod, nodeInfo.Node()); err != nil {
if err := meta.podSpreadCache.addPod(addedPod, meta.pod, nodeInfo.Node()); err != nil {
return err
}
@ -588,7 +567,7 @@ func (meta *predicateMetadata) ShallowCopy() PredicateMetadata {
newPredMeta.topologyPairsPotentialAffinityPods = meta.topologyPairsPotentialAffinityPods.clone()
newPredMeta.topologyPairsPotentialAntiAffinityPods = meta.topologyPairsPotentialAntiAffinityPods.clone()
newPredMeta.topologyPairsAntiAffinityPodsMap = meta.topologyPairsAntiAffinityPodsMap.clone()
newPredMeta.topologyPairsPodSpreadMap = meta.topologyPairsPodSpreadMap.clone()
newPredMeta.podSpreadCache = meta.podSpreadCache.clone()
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
meta.serviceAffinityMatchingPodServices...)
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),

File diff suppressed because it is too large Load Diff

View File

@ -1796,15 +1796,15 @@ func EvenPodsSpreadPredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *sche
return true, nil, nil
}
var topologyPairsPodSpreadMap *topologyPairsPodSpreadMap
var podSpreadCache *podSpreadCache
if predicateMeta, ok := meta.(*predicateMetadata); ok {
topologyPairsPodSpreadMap = predicateMeta.topologyPairsPodSpreadMap
podSpreadCache = predicateMeta.podSpreadCache
} else { // We don't have precomputed metadata. We have to follow a slow path to check spread constraints.
// TODO(Huang-Wei): get it implemented
// TODO(autoscaler): get it implemented
return false, nil, errors.New("metadata not pre-computed for EvenPodsSpreadPredicate")
}
if topologyPairsPodSpreadMap == nil || len(topologyPairsPodSpreadMap.topologyKeyToMinPodsMap) == 0 {
if podSpreadCache == nil || len(podSpreadCache.tpPairToMatchNum) == 0 {
return true, nil, nil
}
@ -1821,25 +1821,24 @@ func EvenPodsSpreadPredicate(pod *v1.Pod, meta PredicateMetadata, nodeInfo *sche
if err != nil {
return false, nil, err
}
selfMatchNum := 0
selfMatchNum := int32(0)
if selfMatch {
selfMatchNum = 1
}
pair := topologyPair{key: tpKey, value: tpVal}
minMatchNum, ok := topologyPairsPodSpreadMap.topologyKeyToMinPodsMap[tpKey]
paths, ok := podSpreadCache.tpKeyToCriticalPaths[tpKey]
if !ok {
// error which should not happen
klog.Errorf("internal error: get minMatchNum from key %q of %#v", tpKey, topologyPairsPodSpreadMap.topologyKeyToMinPodsMap)
klog.Errorf("internal error: get paths from key %q of %#v", tpKey, podSpreadCache.tpKeyToCriticalPaths)
continue
}
// judging criteria:
// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
matchNum := len(topologyPairsPodSpreadMap.topologyPairToPods[pair])
// cast to int to avoid potential overflow.
skew := matchNum + selfMatchNum - int(minMatchNum)
if skew > int(constraint.MaxSkew) {
minMatchNum := paths[0].matchNum
matchNum := podSpreadCache.tpPairToMatchNum[pair]
skew := matchNum + selfMatchNum - minMatchNum
if skew > constraint.MaxSkew {
klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: matchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, constraint.MaxSkew)
return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
}

View File

@ -38,15 +38,15 @@ type topologyPair struct {
type topologySpreadConstraintsMap struct {
// nodeNameToPodCounts is keyed with node name, and valued with the number of matching pods.
nodeNameToPodCounts map[string]int64
nodeNameToPodCounts map[string]int32
// topologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
topologyPairToPodCounts map[topologyPair]*int64
topologyPairToPodCounts map[topologyPair]*int32
}
func newTopologySpreadConstraintsMap() *topologySpreadConstraintsMap {
return &topologySpreadConstraintsMap{
nodeNameToPodCounts: make(map[string]int64),
topologyPairToPodCounts: make(map[topologyPair]*int64),
nodeNameToPodCounts: make(map[string]int32),
topologyPairToPodCounts: make(map[topologyPair]*int32),
}
}
@ -54,7 +54,7 @@ func newTopologySpreadConstraintsMap() *topologySpreadConstraintsMap {
// This function iterates <nodes> to filter out the nodes which don't have required topologyKey(s),
// and initialize two maps:
// 1) t.topologyPairToPodCounts: keyed with both eligible topology pair and node names.
// 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int64 pointer for eligible node only.
// 2) t.nodeNameToPodCounts: keyed with node name, and valued with a *int32 pointer for eligible node only.
func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node) {
constraints := getSoftTopologySpreadConstraints(pod)
for _, node := range nodes {
@ -64,7 +64,7 @@ func (t *topologySpreadConstraintsMap) initialize(pod *v1.Pod, nodes []*v1.Node)
for _, constraint := range constraints {
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
if t.topologyPairToPodCounts[pair] == nil {
t.topologyPairToPodCounts[pair] = new(int64)
t.topologyPairToPodCounts[pair] = new(int32)
}
}
t.nodeNameToPodCounts[node.Name] = 0
@ -122,7 +122,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
}
// <matchSum> indicates how many pods (on current node) match the <constraint>.
matchSum := int64(0)
matchSum := int32(0)
for _, existingPod := range nodeInfo.Pods() {
match, err := predicates.PodMatchesSpreadConstraint(existingPod.Labels, constraint)
if err != nil {
@ -133,7 +133,7 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
matchSum++
}
}
atomic.AddInt64(t.topologyPairToPodCounts[pair], matchSum)
atomic.AddInt32(t.topologyPairToPodCounts[pair], matchSum)
}
}
workqueue.ParallelizeUntil(ctx, 16, len(allNodeNames), processAllNode)
@ -141,9 +141,9 @@ func CalculateEvenPodsSpreadPriority(pod *v1.Pod, nodeNameToInfo map[string]*sch
return nil, err
}
var minCount int64 = math.MaxInt64
var minCount int32 = math.MaxInt32
// <total> sums up the number of matching pods on each qualified topology pair
var total int64
var total int32
for _, node := range nodes {
if _, ok := t.nodeNameToPodCounts[node.Name]; !ok {
continue

View File

@ -17,7 +17,6 @@ limitations under the License.
package priorities
import (
"fmt"
"reflect"
"testing"
@ -32,8 +31,8 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
name string
pod *v1.Pod
nodes []*v1.Node
wantNodeNameMap map[string]int64
wantTopologyPairMap map[topologyPair]*int64
wantNodeNameMap map[string]int32
wantTopologyPairMap map[topologyPair]*int32
}{
{
name: "normal case",
@ -46,17 +45,17 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
},
wantNodeNameMap: map[string]int64{
wantNodeNameMap: map[string]int32{
"node-a": 0,
"node-b": 0,
"node-x": 0,
},
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "zone", value: "zone2"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
{key: "node", value: "node-b"}: new(int64),
{key: "node", value: "node-x"}: new(int64),
wantTopologyPairMap: map[topologyPair]*int32{
{key: "zone", value: "zone1"}: new(int32),
{key: "zone", value: "zone2"}: new(int32),
{key: "node", value: "node-a"}: new(int32),
{key: "node", value: "node-b"}: new(int32),
{key: "node", value: "node-x"}: new(int32),
},
},
{
@ -70,14 +69,14 @@ func Test_topologySpreadConstraintsMap_initialize(t *testing.T) {
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("node", "node-x").Obj(),
},
wantNodeNameMap: map[string]int64{
wantNodeNameMap: map[string]int32{
"node-a": 0,
"node-b": 0,
},
wantTopologyPairMap: map[topologyPair]*int64{
{key: "zone", value: "zone1"}: new(int64),
{key: "node", value: "node-a"}: new(int64),
{key: "node", value: "node-b"}: new(int64),
wantTopologyPairMap: map[topologyPair]*int32{
{key: "zone", value: "zone1"}: new(int32),
{key: "node", value: "node-a"}: new(int32),
{key: "node", value: "node-b"}: new(int32),
},
},
}
@ -445,43 +444,6 @@ func TestCalculateEvenPodsSpreadPriority(t *testing.T) {
}
}
func makeNodesAndPods(pod *v1.Pod, existingPodsNum, allNodesNum, filteredNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node, filteredNodes []*v1.Node) {
var topologyKeys []string
var labels []string
// regions := 3
zones := 10
for _, c := range pod.Spec.TopologySpreadConstraints {
topologyKeys = append(topologyKeys, c.TopologyKey)
labels = append(labels, c.LabelSelector.MatchExpressions[0].Key)
}
// build nodes
for i := 0; i < allNodesNum; i++ {
nodeWrapper := st.MakeNode().Name(fmt.Sprintf("node%d", i))
for _, tpKey := range topologyKeys {
if tpKey == "zone" {
nodeWrapper = nodeWrapper.Label("zone", fmt.Sprintf("zone%d", i%zones))
} else if tpKey == "node" {
nodeWrapper = nodeWrapper.Label("node", fmt.Sprintf("node%d", i))
}
}
node := nodeWrapper.Obj()
allNodes = append(allNodes, node)
if len(filteredNodes) < filteredNodesNum {
filteredNodes = append(filteredNodes, node)
}
}
// build pods
for i := 0; i < existingPodsNum; i++ {
podWrapper := st.MakePod().Name(fmt.Sprintf("pod%d", i)).Node(fmt.Sprintf("node%d", i%allNodesNum))
// apply labels[0], labels[0,1], ..., labels[all] to each pod in turn
for _, label := range labels[:i%len(labels)+1] {
podWrapper = podWrapper.Label(label, "")
}
existingPods = append(existingPods, podWrapper.Obj())
}
return
}
func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
tests := []struct {
name string
@ -521,7 +483,7 @@ func BenchmarkTestCalculateEvenPodsSpreadPriority(b *testing.B) {
}
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes, filteredNodes := makeNodesAndPods(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
existingPods, allNodes, filteredNodes := st.MakeNodesAndPods(tt.pod, tt.existingPodsNum, tt.allNodesNum, tt.filteredNodesNum)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(existingPods, allNodes)
b.ResetTimer()
for i := 0; i < b.N; i++ {

View File

@ -1107,7 +1107,7 @@ func selectVictimsOnNode(
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
meta.RemovePod(rp, nodeInfoCopy.Node())
}
}
addPod := func(ap *v1.Pod) {

View File

@ -6,6 +6,7 @@ go_library(
name = "go_default_library",
srcs = [
"fake_lister.go",
"workload_prep.go",
"wrappers.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/testing",

View File

@ -0,0 +1,67 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"fmt"
"k8s.io/api/core/v1"
)
// MakeNodesAndPods serves as a testing helper for EvenPodsSpread feature.
// It builds a fake cluster containing running Pods and Nodes.
// The size of Pods and Nodes are determined by input arguments.
// The specs of Pods and Nodes are generated with the following rules:
// - If `pod` has "node" as a topologyKey, each generated node is applied with a unique label: "node: node<i>".
// - If `pod` has "zone" as a topologyKey, each generated node is applied with a rotating label: "zone: zone[0-9]".
// - Depending on "lableSelector.MatchExpressions[0].Key" the `pod` has in each topologySpreadConstraint,
// each generated pod will be applied with label "key1", "key1,key2", ..., "key1,key2,...,keyN" in a rotating manner.
func MakeNodesAndPods(pod *v1.Pod, existingPodsNum, allNodesNum, filteredNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node, filteredNodes []*v1.Node) {
var topologyKeys []string
var labels []string
zones := 10
for _, c := range pod.Spec.TopologySpreadConstraints {
topologyKeys = append(topologyKeys, c.TopologyKey)
labels = append(labels, c.LabelSelector.MatchExpressions[0].Key)
}
// build nodes
for i := 0; i < allNodesNum; i++ {
nodeWrapper := MakeNode().Name(fmt.Sprintf("node%d", i))
for _, tpKey := range topologyKeys {
if tpKey == "zone" {
nodeWrapper = nodeWrapper.Label("zone", fmt.Sprintf("zone%d", i%zones))
} else if tpKey == "node" {
nodeWrapper = nodeWrapper.Label("node", fmt.Sprintf("node%d", i))
}
}
node := nodeWrapper.Obj()
allNodes = append(allNodes, node)
if len(filteredNodes) < filteredNodesNum {
filteredNodes = append(filteredNodes, node)
}
}
// build pods
for i := 0; i < existingPodsNum; i++ {
podWrapper := MakePod().Name(fmt.Sprintf("pod%d", i)).Node(fmt.Sprintf("node%d", i%allNodesNum))
// apply labels[0], labels[0,1], ..., labels[all] to each pod in turn
for _, label := range labels[:i%len(labels)+1] {
podWrapper = podWrapper.Label(label, "")
}
existingPods = append(existingPods, podWrapper.Obj())
}
return
}