mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
further enhancements removing matchingTerms from metadata
This commit is contained in:
parent
3fb6912d08
commit
f6659e4543
@ -38,6 +38,12 @@ type PredicateMetadataFactory struct {
|
|||||||
podLister algorithm.PodLister
|
podLister algorithm.PodLister
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AntiAffinityTerm's topology key value used in predicate metadata
|
||||||
|
type topologyPair struct {
|
||||||
|
key string
|
||||||
|
value string
|
||||||
|
}
|
||||||
|
|
||||||
// Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file
|
// Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file
|
||||||
// due to the way declarations are processed in predicate declaration unit tests.
|
// due to the way declarations are processed in predicate declaration unit tests.
|
||||||
type matchingPodAntiAffinityTerm struct {
|
type matchingPodAntiAffinityTerm struct {
|
||||||
@ -52,11 +58,11 @@ type predicateMetadata struct {
|
|||||||
podBestEffort bool
|
podBestEffort bool
|
||||||
podRequest *schedulercache.Resource
|
podRequest *schedulercache.Resource
|
||||||
podPorts []*v1.ContainerPort
|
podPorts []*v1.ContainerPort
|
||||||
//key is a pod full name with the anti-affinity rules.
|
// A map of antiffinity terms' topology pairs to the pods'
|
||||||
matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm
|
|
||||||
// A map of antiffinity terms' topology ke values to the pods' names
|
|
||||||
// that can potentially match the affinity rules of the pod
|
// that can potentially match the affinity rules of the pod
|
||||||
topologyValueToAntiAffinityPods map[string][]string
|
topologyPairToAntiAffinityPods map[topologyPair][]*v1.Pod
|
||||||
|
// Reverse map for topologyPairToAntiAffinityPods to reduce deletion time
|
||||||
|
antiAffinityPodToTopologyPairs map[string][]topologyPair
|
||||||
// A map of node name to a list of Pods on the node that can potentially match
|
// A map of node name to a list of Pods on the node that can potentially match
|
||||||
// the affinity rules of the "pod".
|
// the affinity rules of the "pod".
|
||||||
nodeNameToMatchingAffinityPods map[string][]*v1.Pod
|
nodeNameToMatchingAffinityPods map[string][]*v1.Pod
|
||||||
@ -116,7 +122,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
|
|||||||
if pod == nil {
|
if pod == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
matchingTerms, topologyValues, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap)
|
podToTopolgyPair, topologyPairToPods, err := getMatchingTopologyPairs(pod, nodeNameToInfoMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -130,10 +136,10 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
|
|||||||
podBestEffort: isPodBestEffort(pod),
|
podBestEffort: isPodBestEffort(pod),
|
||||||
podRequest: GetResourceRequest(pod),
|
podRequest: GetResourceRequest(pod),
|
||||||
podPorts: schedutil.GetContainerPorts(pod),
|
podPorts: schedutil.GetContainerPorts(pod),
|
||||||
matchingAntiAffinityTerms: matchingTerms,
|
|
||||||
nodeNameToMatchingAffinityPods: affinityPods,
|
nodeNameToMatchingAffinityPods: affinityPods,
|
||||||
nodeNameToMatchingAntiAffinityPods: antiAffinityPods,
|
nodeNameToMatchingAntiAffinityPods: antiAffinityPods,
|
||||||
topologyValueToAntiAffinityPods: topologyValues,
|
topologyPairToAntiAffinityPods: topologyPairToPods,
|
||||||
|
antiAffinityPodToTopologyPairs: podToTopolgyPair,
|
||||||
}
|
}
|
||||||
for predicateName, precomputeFunc := range predicateMetadataProducers {
|
for predicateName, precomputeFunc := range predicateMetadataProducers {
|
||||||
glog.V(10).Infof("Precompute: %v", predicateName)
|
glog.V(10).Infof("Precompute: %v", predicateName)
|
||||||
@ -149,23 +155,22 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
|
|||||||
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
|
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
|
||||||
return fmt.Errorf("deletedPod and meta.pod must not be the same")
|
return fmt.Errorf("deletedPod and meta.pod must not be the same")
|
||||||
}
|
}
|
||||||
|
// Delete pod from matching topology pairs map
|
||||||
// Delete pod from matching topology values map
|
for _, pair := range meta.antiAffinityPodToTopologyPairs[deletedPodFullName] {
|
||||||
for _, term := range meta.matchingAntiAffinityTerms[deletedPodFullName] {
|
for index, pod := range meta.topologyPairToAntiAffinityPods[pair] {
|
||||||
if topologyValue, ok := term.node.Labels[term.term.TopologyKey]; ok {
|
if schedutil.GetPodFullName(pod) == deletedPodFullName {
|
||||||
for index, podName := range meta.topologyValueToAntiAffinityPods[topologyValue] {
|
podsList := meta.topologyPairToAntiAffinityPods[pair]
|
||||||
if podName == deletedPodFullName {
|
podsList[index] = podsList[len(podsList)-1]
|
||||||
podsList := meta.topologyValueToAntiAffinityPods[topologyValue]
|
if len(podsList) <= 1 {
|
||||||
meta.topologyValueToAntiAffinityPods[topologyValue] = append(podsList[:index],
|
delete(meta.topologyPairToAntiAffinityPods, pair)
|
||||||
podsList[index+1:]...)
|
} else {
|
||||||
break
|
meta.topologyPairToAntiAffinityPods[pair] = podsList[:len(podsList)-1]
|
||||||
}
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
delete(meta.antiAffinityPodToTopologyPairs, deletedPodFullName)
|
||||||
// Delete any anti-affinity rule from the deletedPod.
|
|
||||||
delete(meta.matchingAntiAffinityTerms, deletedPodFullName)
|
|
||||||
// Delete pod from the matching affinity or anti-affinity pods if exists.
|
// Delete pod from the matching affinity or anti-affinity pods if exists.
|
||||||
affinity := meta.pod.Spec.Affinity
|
affinity := meta.pod.Spec.Affinity
|
||||||
podNodeName := deletedPod.Spec.NodeName
|
podNodeName := deletedPod.Spec.NodeName
|
||||||
@ -222,21 +227,16 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
|
|||||||
return fmt.Errorf("invalid node in nodeInfo")
|
return fmt.Errorf("invalid node in nodeInfo")
|
||||||
}
|
}
|
||||||
// Add matching anti-affinity terms of the addedPod to the map.
|
// Add matching anti-affinity terms of the addedPod to the map.
|
||||||
podMatchingTerms, podTopologyValuesToMatchingPods, err := getMatchingAntiAffinityTermsOfExistingPod(meta.pod, addedPod, nodeInfo.Node())
|
matchingPodToTopologyPairs, podTopologyPairToMatchingPods, err := getMatchingTopologyPairsOfExistingPod(meta.pod, addedPod, nodeInfo.Node())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(podMatchingTerms) > 0 {
|
if len(matchingPodToTopologyPairs) > 0 {
|
||||||
existingTerms, found := meta.matchingAntiAffinityTerms[addedPodFullName]
|
for pair, pods := range podTopologyPairToMatchingPods {
|
||||||
if found {
|
meta.topologyPairToAntiAffinityPods[pair] = append(meta.topologyPairToAntiAffinityPods[pair], pods...)
|
||||||
meta.matchingAntiAffinityTerms[addedPodFullName] = append(existingTerms,
|
|
||||||
podMatchingTerms...)
|
|
||||||
} else {
|
|
||||||
meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms
|
|
||||||
}
|
}
|
||||||
|
for pod, pairs := range matchingPodToTopologyPairs {
|
||||||
for topologyValue, pods := range podTopologyValuesToMatchingPods {
|
meta.antiAffinityPodToTopologyPairs[pod] = append(meta.antiAffinityPodToTopologyPairs[pod], pairs...)
|
||||||
meta.topologyValueToAntiAffinityPods[topologyValue] = append(meta.topologyValueToAntiAffinityPods[topologyValue], pods...)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
|
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
|
||||||
@ -291,10 +291,6 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
|
|||||||
ignoredExtendedResources: meta.ignoredExtendedResources,
|
ignoredExtendedResources: meta.ignoredExtendedResources,
|
||||||
}
|
}
|
||||||
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
|
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
|
||||||
newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{}
|
|
||||||
for k, v := range meta.matchingAntiAffinityTerms {
|
|
||||||
newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...)
|
|
||||||
}
|
|
||||||
newPredMeta.nodeNameToMatchingAffinityPods = make(map[string][]*v1.Pod)
|
newPredMeta.nodeNameToMatchingAffinityPods = make(map[string][]*v1.Pod)
|
||||||
for k, v := range meta.nodeNameToMatchingAffinityPods {
|
for k, v := range meta.nodeNameToMatchingAffinityPods {
|
||||||
newPredMeta.nodeNameToMatchingAffinityPods[k] = append([]*v1.Pod(nil), v...)
|
newPredMeta.nodeNameToMatchingAffinityPods[k] = append([]*v1.Pod(nil), v...)
|
||||||
@ -303,15 +299,18 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
|
|||||||
for k, v := range meta.nodeNameToMatchingAntiAffinityPods {
|
for k, v := range meta.nodeNameToMatchingAntiAffinityPods {
|
||||||
newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
|
newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
|
||||||
}
|
}
|
||||||
newPredMeta.topologyValueToAntiAffinityPods = make(map[string][]string)
|
newPredMeta.topologyPairToAntiAffinityPods = make(map[topologyPair][]*v1.Pod)
|
||||||
for k, v := range meta.topologyValueToAntiAffinityPods {
|
for k, v := range meta.topologyPairToAntiAffinityPods {
|
||||||
newPredMeta.topologyValueToAntiAffinityPods[k] = append([]string(nil), v...)
|
newPredMeta.topologyPairToAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
|
||||||
|
}
|
||||||
|
newPredMeta.antiAffinityPodToTopologyPairs = make(map[string][]topologyPair)
|
||||||
|
for k, v := range meta.antiAffinityPodToTopologyPairs {
|
||||||
|
newPredMeta.antiAffinityPodToTopologyPairs[k] = append([]topologyPair(nil), v...)
|
||||||
}
|
}
|
||||||
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
|
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
|
||||||
meta.serviceAffinityMatchingPodServices...)
|
meta.serviceAffinityMatchingPodServices...)
|
||||||
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
|
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
|
||||||
meta.serviceAffinityMatchingPodList...)
|
meta.serviceAffinityMatchingPodList...)
|
||||||
|
|
||||||
return (algorithm.PredicateMetadata)(newPredMeta)
|
return (algorithm.PredicateMetadata)(newPredMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,11 +113,6 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
|
|||||||
for !reflect.DeepEqual(meta1.podPorts, meta2.podPorts) {
|
for !reflect.DeepEqual(meta1.podPorts, meta2.podPorts) {
|
||||||
return fmt.Errorf("podPorts are not equal")
|
return fmt.Errorf("podPorts are not equal")
|
||||||
}
|
}
|
||||||
sortAntiAffinityTerms(meta1.matchingAntiAffinityTerms)
|
|
||||||
sortAntiAffinityTerms(meta2.matchingAntiAffinityTerms)
|
|
||||||
if !reflect.DeepEqual(meta1.matchingAntiAffinityTerms, meta2.matchingAntiAffinityTerms) {
|
|
||||||
return fmt.Errorf("matchingAntiAffinityTerms are not euqal")
|
|
||||||
}
|
|
||||||
sortNodePodMap(meta1.nodeNameToMatchingAffinityPods)
|
sortNodePodMap(meta1.nodeNameToMatchingAffinityPods)
|
||||||
sortNodePodMap(meta2.nodeNameToMatchingAffinityPods)
|
sortNodePodMap(meta2.nodeNameToMatchingAffinityPods)
|
||||||
if !reflect.DeepEqual(meta1.nodeNameToMatchingAffinityPods, meta2.nodeNameToMatchingAffinityPods) {
|
if !reflect.DeepEqual(meta1.nodeNameToMatchingAffinityPods, meta2.nodeNameToMatchingAffinityPods) {
|
||||||
@ -465,19 +460,25 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
|
|||||||
HostIP: "1.2.3.4",
|
HostIP: "1.2.3.4",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{
|
topologyPairToAntiAffinityPods: map[topologyPair][]*v1.Pod{
|
||||||
"term1": {
|
{key: "name", value: "machine1"}: {
|
||||||
{
|
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2", Labels: selector1},
|
||||||
term: &v1.PodAffinityTerm{TopologyKey: "node"},
|
Spec: v1.PodSpec{NodeName: "nodeC"},
|
||||||
node: &v1.Node{
|
},
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
|
},
|
||||||
},
|
{key: "name", value: "machine2"}: {
|
||||||
|
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
|
||||||
|
Spec: v1.PodSpec{NodeName: "nodeA"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
topologyValueToAntiAffinityPods: map[string][]string{
|
antiAffinityPodToTopologyPairs: map[string][]topologyPair{
|
||||||
"machine1": {"p1", "p2"},
|
"p2": {
|
||||||
"machine2": {"p3"},
|
topologyPair{key: "name", value: "machine1"},
|
||||||
|
},
|
||||||
|
"p1": {
|
||||||
|
topologyPair{key: "name", value: "machine2"},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{
|
nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{
|
||||||
"nodeA": {
|
"nodeA": {
|
||||||
|
@ -1246,7 +1246,7 @@ func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po
|
|||||||
return terms
|
return terms
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, map[string][]string, error) {
|
func getMatchingTopologyPairs(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) {
|
||||||
allNodeNames := make([]string, 0, len(nodeInfoMap))
|
allNodeNames := make([]string, 0, len(nodeInfoMap))
|
||||||
for name := range nodeInfoMap {
|
for name := range nodeInfoMap {
|
||||||
allNodeNames = append(allNodeNames, name)
|
allNodeNames = append(allNodeNames, name)
|
||||||
@ -1254,25 +1254,25 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
|
|||||||
|
|
||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
var firstError error
|
var firstError error
|
||||||
podsToMatchingAntiAffinityTerms := make(map[string][]matchingPodAntiAffinityTerm)
|
|
||||||
topologyValuesToMatchingPods := make(map[string][]string)
|
|
||||||
|
|
||||||
appendPodsMatchingAntiAffinityTerms := func(toAppend map[string][]matchingPodAntiAffinityTerm) {
|
topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod)
|
||||||
|
matchingPodToTopologyPair := make(map[string][]topologyPair)
|
||||||
|
|
||||||
|
appendTopologyPairToMatchingPods := func(toAppend map[topologyPair][]*v1.Pod) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
for uid, terms := range toAppend {
|
for pair, pods := range toAppend {
|
||||||
podsToMatchingAntiAffinityTerms[uid] = append(podsToMatchingAntiAffinityTerms[uid], terms...)
|
topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], pods...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
appendTopologyValuesMatchingPods := func(toAppend map[string][]string) {
|
appendMatchingPodToTopologyPair := func(toAppend map[string][]topologyPair) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
for topologyValue, pods := range toAppend {
|
for pod, pairs := range toAppend {
|
||||||
topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], pods...)
|
matchingPodToTopologyPair[pod] = append(matchingPodToTopologyPair[pod], pairs...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
catchError := func(err error) {
|
catchError := func(err error) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
@ -1288,9 +1288,8 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
|
|||||||
catchError(fmt.Errorf("node not found"))
|
catchError(fmt.Errorf("node not found"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nodePodsToMatchingAntiAffinityTerms := make(map[string][]matchingPodAntiAffinityTerm)
|
nodeTopologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod)
|
||||||
nodeTopologyValuesToMatchingPods := make(map[string][]string)
|
nodeMatchingPodToTopologyPairs := make(map[string][]topologyPair)
|
||||||
|
|
||||||
for _, existingPod := range nodeInfo.PodsWithAffinity() {
|
for _, existingPod := range nodeInfo.PodsWithAffinity() {
|
||||||
affinity := existingPod.Spec.Affinity
|
affinity := existingPod.Spec.Affinity
|
||||||
if affinity == nil {
|
if affinity == nil {
|
||||||
@ -1304,31 +1303,27 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
|
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
|
||||||
existingPodFullName := schedutil.GetPodFullName(existingPod)
|
|
||||||
nodePodsToMatchingAntiAffinityTerms[existingPodFullName] = append(
|
|
||||||
nodePodsToMatchingAntiAffinityTerms[existingPodFullName],
|
|
||||||
matchingPodAntiAffinityTerm{term: &term, node: node})
|
|
||||||
|
|
||||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||||
nodeTopologyValuesToMatchingPods[topologyValue] = append(nodeTopologyValuesToMatchingPods[topologyValue], existingPodFullName)
|
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||||
|
nodeTopologyPairToMatchingPods[pair] = append(nodeTopologyPairToMatchingPods[pair], existingPod)
|
||||||
|
existingPodFullName := schedutil.GetPodFullName(existingPod)
|
||||||
|
nodeMatchingPodToTopologyPairs[existingPodFullName] = append(nodeMatchingPodToTopologyPairs[existingPodFullName], pair)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(nodePodsToMatchingAntiAffinityTerms) > 0 {
|
if len(nodeTopologyPairToMatchingPods) > 0 {
|
||||||
appendPodsMatchingAntiAffinityTerms(nodePodsToMatchingAntiAffinityTerms)
|
appendTopologyPairToMatchingPods(nodeTopologyPairToMatchingPods)
|
||||||
}
|
appendMatchingPodToTopologyPair(nodeMatchingPodToTopologyPairs)
|
||||||
if len(nodeTopologyValuesToMatchingPods) > 0 {
|
|
||||||
appendTopologyValuesMatchingPods(nodeTopologyValuesToMatchingPods)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
workqueue.Parallelize(16, len(allNodeNames), processNode)
|
workqueue.Parallelize(16, len(allNodeNames), processNode)
|
||||||
return podsToMatchingAntiAffinityTerms, topologyValuesToMatchingPods, firstError
|
return matchingPodToTopologyPair, topologyPairToMatchingPods, firstError
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, map[string][]string, error) {
|
func getMatchingTopologyPairsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) {
|
||||||
var podMatchingTerms []matchingPodAntiAffinityTerm
|
topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod)
|
||||||
topologyValuesToMatchingPods := make(map[string][]string)
|
matchingPodToTopologyPairs := make(map[string][]topologyPair)
|
||||||
|
|
||||||
affinity := existingPod.Spec.Affinity
|
affinity := existingPod.Spec.Affinity
|
||||||
if affinity != nil && affinity.PodAntiAffinity != nil {
|
if affinity != nil && affinity.PodAntiAffinity != nil {
|
||||||
@ -1339,21 +1334,21 @@ func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.P
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
|
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
|
||||||
podMatchingTerms = append(podMatchingTerms, matchingPodAntiAffinityTerm{term: &term, node: node})
|
|
||||||
existingPodFullName := schedutil.GetPodFullName(existingPod)
|
|
||||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||||
topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], existingPodFullName)
|
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||||
|
topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], existingPod)
|
||||||
|
existingPodFullName := schedutil.GetPodFullName(existingPod)
|
||||||
|
matchingPodToTopologyPairs[existingPodFullName] = append(matchingPodToTopologyPairs[existingPodFullName], pair)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return podMatchingTerms, topologyValuesToMatchingPods, nil
|
return matchingPodToTopologyPairs, topologyPairToMatchingPods, nil
|
||||||
}
|
}
|
||||||
|
func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairs(pod *v1.Pod, allPods []*v1.Pod) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) {
|
||||||
|
|
||||||
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, map[string][]string, error) {
|
topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod)
|
||||||
result := make(map[string][]matchingPodAntiAffinityTerm)
|
matchingPodToTopologyPairs := make(map[string][]topologyPair)
|
||||||
topologyValuesToMatchingPods := make(map[string][]string)
|
|
||||||
|
|
||||||
for _, existingPod := range allPods {
|
for _, existingPod := range allPods {
|
||||||
affinity := existingPod.Spec.Affinity
|
affinity := existingPod.Spec.Affinity
|
||||||
if affinity != nil && affinity.PodAntiAffinity != nil {
|
if affinity != nil && affinity.PodAntiAffinity != nil {
|
||||||
@ -1365,20 +1360,19 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [
|
|||||||
}
|
}
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
existingPodMatchingTerms, podTopologyValuesToMatchingPods, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode)
|
existingPodTopologyTerms, podTopologyPairToMatchingPods, err := getMatchingTopologyPairsOfExistingPod(pod, existingPod, existingPodNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if len(existingPodMatchingTerms) > 0 {
|
for pair, pods := range podTopologyPairToMatchingPods {
|
||||||
existingPodFullName := schedutil.GetPodFullName(existingPod)
|
topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], pods...)
|
||||||
result[existingPodFullName] = existingPodMatchingTerms
|
|
||||||
}
|
}
|
||||||
for topologyValue, pods := range podTopologyValuesToMatchingPods {
|
for pod, pairs := range existingPodTopologyTerms {
|
||||||
topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], pods...)
|
matchingPodToTopologyPairs[pod] = append(matchingPodToTopologyPairs[pod], pairs...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result, topologyValuesToMatchingPods, nil
|
return matchingPodToTopologyPairs, topologyPairToMatchingPods, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks if scheduling the pod onto this node would break any anti-affinity
|
// Checks if scheduling the pod onto this node would break any anti-affinity
|
||||||
@ -1388,12 +1382,10 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
|
|||||||
if node == nil {
|
if node == nil {
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil")
|
return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil")
|
||||||
}
|
}
|
||||||
var matchingTerms map[string][]matchingPodAntiAffinityTerm
|
var topologyPairToMatchingPods map[topologyPair][]*v1.Pod
|
||||||
var topologyValuesToMatchingPods map[string][]string
|
|
||||||
|
|
||||||
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||||
matchingTerms = predicateMeta.matchingAntiAffinityTerms
|
topologyPairToMatchingPods = predicateMeta.topologyPairToAntiAffinityPods
|
||||||
topologyValuesToMatchingPods = predicateMeta.topologyValueToAntiAffinityPods
|
|
||||||
} else {
|
} else {
|
||||||
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
|
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
|
||||||
// present in nodeInfo. Pods on other nodes pass the filter.
|
// present in nodeInfo. Pods on other nodes pass the filter.
|
||||||
@ -1403,29 +1395,22 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
|
|||||||
glog.Error(errMessage)
|
glog.Error(errMessage)
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
||||||
}
|
}
|
||||||
if matchingTerms, topologyValuesToMatchingPods, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil {
|
if _, topologyPairToMatchingPods, err = c.getMatchingAntiAffinityTopologyPairs(pod, filteredPods); err != nil {
|
||||||
errMessage := fmt.Sprintf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
|
errMessage := fmt.Sprintf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
|
||||||
glog.Error(errMessage)
|
glog.Error(errMessage)
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate over topology values, to get matching pods and get their matching terms to check for same topolgy key
|
// Iterate over topology topology pairs to get any of the pods being affected by
|
||||||
// currently ignored if predicateMetadata is not precomputed
|
// the scheduled pod anti-affinity rules
|
||||||
for _, topologyValue := range node.Labels {
|
for topologyKey, topologyValue := range node.Labels {
|
||||||
potentialPods := topologyValuesToMatchingPods[topologyValue]
|
if violatedPods, ok := topologyPairToMatchingPods[topologyPair{key: topologyKey, value: topologyValue}]; ok {
|
||||||
for _, matchingPod := range potentialPods {
|
affinity := violatedPods[0].Spec.Affinity
|
||||||
podTerms := matchingTerms[matchingPod]
|
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
|
||||||
for i := range podTerms {
|
if term.TopologyKey == topologyKey {
|
||||||
term := &podTerms[i]
|
|
||||||
if len(term.term.TopologyKey) == 0 {
|
|
||||||
errMessage := fmt.Sprintf("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
|
|
||||||
glog.Error(errMessage)
|
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
|
||||||
}
|
|
||||||
if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
|
|
||||||
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
|
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
|
||||||
podName(pod), node.Name, term.term)
|
podName(pod), node.Name, term)
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, nil
|
return ErrExistingPodsAntiAffinityRulesNotMatch, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user