diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 8dfb3e86558..81085b72a7b 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -637,38 +637,36 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al return affinity.CheckServiceAffinity } -// CheckServiceAffinity ensures that only the nodes that match the specified labels are considered for scheduling. -// The set of labels to be considered are provided to the struct (ServiceAffinity). -// The pod is checked for the labels and any missing labels are then checked in the node -// that hosts the service pods (peers) for the given pod. +// The checkServiceAffinity predicate matches nodes in such a way to force that +// ServiceAffinity.labels are homogenous for pods added to a node. +// (i.e. it returns true IFF this pod can be added to this node, such +// that all other pods in the same service are running on nodes w/ +// the exact same ServiceAffinity.label values). // -// We add an implicit selector requiring some particular value V for label L to a pod, if: -// - L is listed in the ServiceAffinity object that is passed into the function -// - the pod does not have any NodeSelector for L -// - some other pod from the same service is already scheduled onto a node that has value V for label L +// Details: +// +// If (the svc affinity labels are not a subset of pod's label selectors ) +// The pod has all information necessary to check affinity, the pod's label selector is sufficient to calculate +// the match. +// Otherwise: +// Create an "implicit selector" which gaurantees pods will land on nodes with similar values +// for the affinity labels. +// To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace. +// These backfilled labels in the selector "L" are defined like so: +// - L is a label that the ServiceAffinity object needs as a matching constraints. +// - L is not defined in the pod itself already. +// - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value. func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") } - var affinitySelector labels.Selector - // check if the pod being scheduled has the affinity labels specified in its NodeSelector - affinityLabels := map[string]string{} - nodeSelector := labels.Set(pod.Spec.NodeSelector) - labelsExist := true - for _, l := range s.labels { - if nodeSelector.Has(l) { - affinityLabels[l] = nodeSelector.Get(l) - } else { - // the current pod does not specify all the labels, look in the existing service pods - labelsExist = false - } - } + affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector)) - // skip looking at other pods in the service if the current pod defines all the required affinity labels - if !labelsExist { + // Introspect services IFF we didn't predefine all the affinity labels in the pod itself. + if len(s.labels) > len(affinityLabels) { services, err := s.serviceLister.GetPodServices(pod) if err == nil && len(services) > 0 { // just use the first service and get the other pods within the service @@ -679,40 +677,20 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n return false, nil, err } // consider only the pods that belong to the same namespace - nsServicePods := []*api.Pod{} - for _, nsPod := range servicePods { - if nsPod.Namespace == pod.Namespace { - nsServicePods = append(nsServicePods, nsPod) - } - } + nsServicePods := FilterPodsByNamespace(servicePods, pod.Namespace) if len(nsServicePods) > 0 { // consider any service pod and fetch the node its hosted on otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName) if err != nil { return false, nil, err } - for _, l := range s.labels { - // If the pod being scheduled has the label value specified, do not override it - if _, exists := affinityLabels[l]; exists { - continue - } - if labels.Set(otherNode.Labels).Has(l) { - affinityLabels[l] = labels.Set(otherNode.Labels).Get(l) - } - } + AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(otherNode.Labels)) } } } - // if there are no existing pods in the service, consider all nodes - if len(affinityLabels) == 0 { - affinitySelector = labels.Everything() - } else { - affinitySelector = labels.Set(affinityLabels).AsSelector() - } - // check if the node matches the selector - if affinitySelector.Matches(labels.Set(node.Labels)) { + if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) { return true, nil, nil } return false, []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}, nil diff --git a/plugin/pkg/scheduler/algorithm/predicates/utils.go b/plugin/pkg/scheduler/algorithm/predicates/utils.go new file mode 100644 index 00000000000..e5a1faeec25 --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/predicates/utils.go @@ -0,0 +1,64 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import "k8s.io/kubernetes/pkg/labels" +import "k8s.io/kubernetes/pkg/api" + +// FindLabelsInSet gets as many key/value pairs as possible out of a label set. +func FindLabelsInSet(labelsToKeep []string, selector labels.Set) map[string]string { + aL := make(map[string]string) + for _, l := range labelsToKeep { + if selector.Has(l) { + aL[l] = selector.Get(l) + } + } + return aL +} + +// AddUnsetLabelsToMap backfills missing values with values we find in a map. +func AddUnsetLabelsToMap(aL map[string]string, labelsToAdd []string, labelSet labels.Set) { + for _, l := range labelsToAdd { + // if the label is already there, dont overwrite it. + if _, exists := aL[l]; exists { + continue + } + // otherwise, backfill this label. + if labelSet.Has(l) { + aL[l] = labelSet.Get(l) + } + } +} + +// FilterPodsByNamespace filters pods outside a namespace from the given list. +func FilterPodsByNamespace(pods []*api.Pod, ns string) []*api.Pod { + filtered := []*api.Pod{} + for _, nsPod := range pods { + if nsPod.Namespace == ns { + filtered = append(filtered, nsPod) + } + } + return filtered +} + +// CreateSelectorFromLabels is used to define a selector that corresponds to the keys in a map. +func CreateSelectorFromLabels(aL map[string]string) labels.Selector { + if aL == nil || len(aL) == 0 { + return labels.Everything() + } + return labels.Set(aL).AsSelector() +} diff --git a/plugin/pkg/scheduler/algorithm/predicates/utils_test.go b/plugin/pkg/scheduler/algorithm/predicates/utils_test.go new file mode 100644 index 00000000000..d3d32e12bfb --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/predicates/utils_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicates + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" +) + +// ExampleUtils is a https://blog.golang.org/examples styled unit test. +func ExampleFindLabelsInSet() { + labelSubset := labels.Set{} + labelSubset["label1"] = "value1" + labelSubset["label2"] = "value2" + // Lets make believe that these pods are on the cluster. + // Utility functions will inspect their labels, filter them, and so on. + nsPods := []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "pod1", + Namespace: "ns1", + Labels: map[string]string{ + "label1": "wontSeeThis", + "label2": "wontSeeThis", + "label3": "will_see_this", + }, + }, + }, // first pod which will be used via the utilities + { + ObjectMeta: api.ObjectMeta{ + Name: "pod2", + Namespace: "ns1", + }, + }, + + {}, // a third pod which will have no effect on anything. + } + fmt.Println(FindLabelsInSet([]string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)["label3"]) + AddUnsetLabelsToMap(labelSubset, []string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels) + fmt.Println(labelSubset) + + for _, pod := range FilterPodsByNamespace(nsPods, "ns1") { + fmt.Print(pod.Name, ",") + } + // Output: + // will_see_this + // label1=value1,label2=value2,label3=will_see_this + // pod1,pod2, +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/metadata.go b/plugin/pkg/scheduler/algorithm/priorities/metadata.go index 5517e0a8491..d41b34bfde4 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/metadata.go +++ b/plugin/pkg/scheduler/algorithm/priorities/metadata.go @@ -28,7 +28,8 @@ type priorityMetadata struct { affinity *api.Affinity } -func PriorityMetadata(pod *api.Pod) interface{} { +// PriorityMetadata is a MetadataProducer. Node info can be nil. +func PriorityMetadata(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} { // If we cannot compute metadata, just return nil if pod == nil { return nil diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index f1982fe27de..f0983774692 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -39,7 +39,7 @@ type PriorityMapFunction func(pod *api.Pod, meta interface{}, nodeInfo *schedule type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error // MetdataProducer is a function that computes metadata for a given pod. -type MetadataProducer func(pod *api.Pod) interface{} +type MetadataProducer func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} // DEPRECATED // Use Map-Reduce pattern for priority functions. @@ -54,7 +54,7 @@ type PriorityConfig struct { Weight int } -func EmptyMetadataProducer(pod *api.Pod) interface{} { +func EmptyMetadataProducer(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} { return nil } diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 169592066f3..8f2e25e17b6 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -113,8 +113,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe } trace.Step("Prioritizing") - meta := g.priorityMetaProducer(pod) - priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, meta, g.prioritizers, filteredNodes, g.extenders) + metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap) + priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) if err != nil { return "", err }