Merge pull request #50805 from bsalamat/preemption_metacompute

Automatic merge from submit-queue

Add support to modify precomputed predicate metadata upon adding/removal of a pod

**What this PR does / why we need it**: This PR adds capability to change precomputed predicate metadata and let's us add/remove pods to the precomputed metadata efficiently without the need ot recomputing everything upon addition/removal of pods. This PR is needed as a part of adding preemption logic to the scheduler.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

**Special notes for your reviewer**:
To make the review process a bit easier, there are three commits. The cleanup commit is only moving code and renaming some functions, without logic changes.

**Release note**:

```release-note
NONE
```
ref/ #47604
ref/ #48646

/assign @wojtek-t 

@kubernetes/sig-scheduling-pr-reviews @davidopp
This commit is contained in:
Kubernetes Submit Queue
2017-08-28 05:11:19 -07:00
committed by GitHub
14 changed files with 594 additions and 81 deletions

View File

@@ -44,18 +44,6 @@ import (
"k8s.io/metrics/pkg/client/clientset_generated/clientset"
)
// PredicateMetadataModifier: Helper types/variables...
type PredicateMetadataModifier func(pm *predicateMetadata)
var predicatePrecomputeRegisterLock sync.Mutex
var predicatePrecomputations map[string]PredicateMetadataModifier = make(map[string]PredicateMetadataModifier)
func RegisterPredicatePrecomputation(predicateName string, precomp PredicateMetadataModifier) {
predicatePrecomputeRegisterLock.Lock()
defer predicatePrecomputeRegisterLock.Unlock()
predicatePrecomputations[predicateName] = precomp
}
// NodeInfo: Other types for predicate functions...
type NodeInfo interface {
GetNodeInfo(nodeID string) (*v1.Node, error)
@@ -107,23 +95,6 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*v1.Node, error) {
return node, nil
}
// Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file
// due to the way declarations are processed in predicate declaration unit tests.
type matchingPodAntiAffinityTerm struct {
term *v1.PodAffinityTerm
node *v1.Node
}
type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
serviceAffinityMatchingPodList []*v1.Pod
serviceAffinityMatchingPodServices []*v1.Service
}
func isVolumeConflict(volume v1.Volume, pod *v1.Pod) bool {
// fast path if there is no conflict checking targets.
if volume.GCEPersistentDisk == nil && volume.AWSElasticBlockStore == nil && volume.RBD == nil && volume.ISCSI == nil {
@@ -738,43 +709,42 @@ type ServiceAffinity struct {
labels []string
}
// serviceAffinityPrecomputation should be run once by the scheduler before looping through the Predicate. It is a helper function that
// serviceAffinityMetadataProducer should be run once by the scheduler before looping through the Predicate. It is a helper function that
// only should be referenced by NewServiceAffinityPredicate.
func (s *ServiceAffinity) serviceAffinityPrecomputation(pm *predicateMetadata) {
func (s *ServiceAffinity) serviceAffinityMetadataProducer(pm *predicateMetadata) {
if pm.pod == nil {
glog.Errorf("Cannot precompute service affinity, a pod is required to calculate service affinity.")
return
}
pm.serviceAffinityInUse = true
var errSvc, errList error
// Store services which match the pod.
pm.serviceAffinityMatchingPodServices, errSvc = s.serviceLister.GetPodServices(pm.pod)
selector := CreateSelectorFromLabels(pm.pod.Labels)
// consider only the pods that belong to the same namespace
allMatches, errList := s.podLister.List(selector)
// In the future maybe we will return them as part of the function.
if errSvc != nil || errList != nil {
glog.Errorf("Some Error were found while precomputing svc affinity: \nservices:%v , \npods:%v", errSvc, errList)
}
// consider only the pods that belong to the same namespace
pm.serviceAffinityMatchingPodList = FilterPodsByNamespace(allMatches, pm.pod.Namespace)
}
func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (algorithm.FitPredicate, PredicateMetadataModifier) {
func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (algorithm.FitPredicate, PredicateMetadataProducer) {
affinity := &ServiceAffinity{
podLister: podLister,
serviceLister: serviceLister,
nodeInfo: nodeInfo,
labels: labels,
}
return affinity.checkServiceAffinity, affinity.serviceAffinityPrecomputation
return affinity.checkServiceAffinity, affinity.serviceAffinityMetadataProducer
}
// checkServiceAffinity is a predicate which matches nodes in such a way to force that
// ServiceAffinity.labels are homogenous for pods that are scheduled to a node.
// (i.e. it returns true IFF this pod can be added to this node such that all other pods in
// the same service are running on nodes with
// the exact same ServiceAffinity.label values).
// the same service are running on nodes with the exact same ServiceAffinity.label values).
//
// For example:
// If the first pod of a service was scheduled to a node with label "region=foo",
@@ -807,7 +777,7 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no
} else {
// Make the predicate resilient in case metadata is missing.
pm = &predicateMetadata{pod: pod}
s.serviceAffinityPrecomputation(pm)
s.serviceAffinityMetadataProducer(pm)
pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices
}
node := nodeInfo.Node()
@@ -964,7 +934,7 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta interface
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
if !c.satisfiesExistingPodsAntiAffinity(pod, meta, node) {
if !c.satisfiesExistingPodsAntiAffinity(pod, meta, nodeInfo) {
return false, []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, nil
}
@@ -973,7 +943,7 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta interface
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return true, nil, nil
}
if !c.satisfiesPodsAffinityAntiAffinity(pod, node, affinity) {
if !c.satisfiesPodsAffinityAntiAffinity(pod, nodeInfo, affinity) {
return false, []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, nil
}
@@ -1042,19 +1012,21 @@ func getPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po
return terms
}
func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) ([]matchingPodAntiAffinityTerm, error) {
func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, error) {
allNodeNames := make([]string, 0, len(nodeInfoMap))
for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name)
}
var lock sync.Mutex
var result []matchingPodAntiAffinityTerm
var firstError error
appendResult := func(toAppend []matchingPodAntiAffinityTerm) {
result := make(map[string][]matchingPodAntiAffinityTerm)
appendResult := func(toAppend map[string][]matchingPodAntiAffinityTerm) {
lock.Lock()
defer lock.Unlock()
result = append(result, toAppend...)
for uid, terms := range toAppend {
result[uid] = append(result[uid], terms...)
}
}
catchError := func(err error) {
lock.Lock()
@@ -1071,7 +1043,7 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
catchError(fmt.Errorf("node not found"))
return
}
var nodeResult []matchingPodAntiAffinityTerm
nodeResult := make(map[string][]matchingPodAntiAffinityTerm)
for _, existingPod := range nodeInfo.PodsWithAffinity() {
affinity := existingPod.Spec.Affinity
if affinity == nil {
@@ -1085,7 +1057,10 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
return
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
nodeResult = append(nodeResult, matchingPodAntiAffinityTerm{term: &term, node: node})
existingPodFullName := schedutil.GetPodFullName(existingPod)
nodeResult[existingPodFullName] = append(
nodeResult[existingPodFullName],
matchingPodAntiAffinityTerm{term: &term, node: node})
}
}
}
@@ -1097,8 +1072,26 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
return result, firstError
}
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) ([]matchingPodAntiAffinityTerm, error) {
func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, error) {
var result []matchingPodAntiAffinityTerm
affinity := existingPod.Spec.Affinity
if affinity != nil && affinity.PodAntiAffinity != nil {
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
result = append(result, matchingPodAntiAffinityTerm{term: &term, node: node})
}
}
}
return result, nil
}
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, error) {
result := make(map[string][]matchingPodAntiAffinityTerm)
for _, existingPod := range allPods {
affinity := existingPod.Spec.Affinity
if affinity != nil && affinity.PodAntiAffinity != nil {
@@ -1106,15 +1099,13 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [
if err != nil {
return nil, err
}
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
result = append(result, matchingPodAntiAffinityTerm{term: &term, node: existingPodNode})
}
existingPodMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode)
if err != nil {
return nil, err
}
if len(existingPodMatchingTerms) > 0 {
existingPodFullName := schedutil.GetPodFullName(existingPod)
result[existingPodFullName] = existingPodMatchingTerms
}
}
}
@@ -1123,30 +1114,39 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [
// Checks if scheduling the pod onto this node would break any anti-affinity
// rules indicated by the existing pods.
func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta interface{}, node *v1.Node) bool {
var matchingTerms []matchingPodAntiAffinityTerm
func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) bool {
node := nodeInfo.Node()
if node == nil {
return false
}
var matchingTerms map[string][]matchingPodAntiAffinityTerm
if predicateMeta, ok := meta.(*predicateMetadata); ok {
matchingTerms = predicateMeta.matchingAntiAffinityTerms
} else {
allPods, err := c.podLister.List(labels.Everything())
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
// present in nodeInfo. Pods on other nodes pass the filter.
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
glog.Errorf("Failed to get all pods, %+v", err)
return false
}
if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, allPods); err != nil {
if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil {
glog.Errorf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
return false
}
}
for _, term := range matchingTerms {
if len(term.term.TopologyKey) == 0 {
glog.Error("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
return false
}
if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
podName(pod), node.Name, term.term)
return false
for _, terms := range matchingTerms {
for i := range terms {
term := &terms[i]
if len(term.term.TopologyKey) == 0 {
glog.Error("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
return false
}
if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
podName(pod), node.Name, term.term)
return false
}
}
}
if glog.V(10) {
@@ -1159,15 +1159,19 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
}
// Checks if scheduling the pod onto this node would break any rules of this pod.
func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node *v1.Node, affinity *v1.Affinity) bool {
allPods, err := c.podLister.List(labels.Everything())
func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo, affinity *v1.Affinity) bool {
node := nodeInfo.Node()
if node == nil {
return false
}
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
return false
}
// Check all affinity terms.
for _, term := range getPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term)
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, node, &term)
if err != nil {
glog.Errorf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
@@ -1200,7 +1204,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
// Check all anti-affinity terms.
for _, term := range getPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, allPods, node, &term)
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, node, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
@@ -1217,7 +1221,7 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, node
return true
}
// PodToleratesNodeTaints checks if a pod tolertaions can tolerate the node taints
// PodToleratesNodeTaints checks if a pod tolerations can tolerate the node taints
func PodToleratesNodeTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
@@ -1225,7 +1229,7 @@ func PodToleratesNodeTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulerca
})
}
// PodToleratesNodeNoExecuteTaints checks if a pod tolertaions can tolerate the node's NoExecute taints
// PodToleratesNodeNoExecuteTaints checks if a pod tolerations can tolerate the node's NoExecute taints
func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute
@@ -1264,7 +1268,7 @@ func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *s
return true, nil, nil
}
// check if node is under memory preasure
// check if node is under memory pressure
if nodeInfo.MemoryPressureCondition() == v1.ConditionTrue {
return false, []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil
}
@@ -1274,7 +1278,7 @@ func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *s
// CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node
// reporting disk pressure condition.
func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// check if node is under disk preasure
// check if node is under disk pressure
if nodeInfo.DiskPressureCondition() == v1.ConditionTrue {
return false, []algorithm.PredicateFailureReason{ErrNodeUnderDiskPressure}, nil
}