Create PredicateFailureReason, modify scheduler predicate interface.

This commit is contained in:
mksalawa 2016-08-09 14:01:46 +02:00
parent 899d98ad15
commit 2749ec7555
10 changed files with 530 additions and 371 deletions

View File

@ -693,7 +693,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *exte
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
continue
}
// ignore pods that belong to the daemonset when taking into account wheter
// ignore pods that belong to the daemonset when taking into account whether
// a daemonset should bind to a node.
if pds := dsc.getPodDaemonSet(pod); pds != nil && ds.Name == pds.Name {
continue
@ -703,18 +703,12 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *exte
nodeInfo := schedulercache.NewNodeInfo(pods...)
nodeInfo.SetNode(node)
fit, err := predicates.GeneralPredicates(newPod, nil, nodeInfo)
fit, reasons, err := predicates.GeneralPredicates(newPod, nil, nodeInfo)
if err != nil {
if re, ok := err.(*predicates.PredicateFailureError); ok {
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %s, for reason: %v", newPod.Name, message)
}
if re, ok := err.(*predicates.InsufficientResourceError); ok {
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %s, for reason: %v", newPod.Name, message)
}
message := fmt.Sprintf("GeneralPredicates failed due to %v.", err)
glog.Warningf("Predicate failed on Pod %s - %s", newPod.Name, message)
glog.Warningf("GeneralPredicates failed on pod %s due to unexpected error: %v", newPod.Name, err)
}
for _, r := range reasons {
glog.V(2).Infof("GeneralPredicates failed on pod %s for reason: %v", newPod.Name, r.GetReason())
}
return fit
}

View File

@ -2059,23 +2059,40 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
}
nodeInfo := schedulercache.NewNodeInfo(pods...)
nodeInfo.SetNode(node)
fit, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
if !fit {
if re, ok := err.(*predicates.PredicateFailureError); ok {
reason := re.PredicateName
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
return fit, reason, message
}
if re, ok := err.(*predicates.InsufficientResourceError); ok {
reason := fmt.Sprintf("OutOf%s", re.ResourceName)
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
return fit, reason, message
}
reason := "UnexpectedPredicateFailureType"
fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
if err != nil {
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
return fit, "UnexpectedError", message
}
if !fit {
var reason string
var message string
if len(reasons) == 0 {
message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
return fit, "UnknownReason", message
}
// If there are failed predicates, we only return the first one as a reason.
r := reasons[0]
switch re := r.(type) {
case *predicates.PredicateFailureError:
reason = re.PredicateName
message = re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
case *predicates.InsufficientResourceError:
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
case *predicates.FailureReason:
reason = re.GetReason()
message = fmt.Sprintf("Failure: %s", re.GetReason())
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
default:
reason = "UnexpectedPredicateFailureType"
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
}
return fit, reason, message
}
// TODO: When disk space scheduling is implemented (#11976), remove the out-of-disk check here and

View File

@ -16,13 +16,9 @@ limitations under the License.
package predicates
import "fmt"
const (
podCountResourceName string = "PodCount"
cpuResourceName string = "CPU"
memoryResourceName string = "Memory"
nvidiaGpuResourceName string = "NvidiaGpu"
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
)
var (
@ -49,13 +45,13 @@ var (
// hit and caused the unfitting failure.
type InsufficientResourceError struct {
// resourceName is the name of the resource that is insufficient
ResourceName string
ResourceName api.ResourceName
requested int64
used int64
capacity int64
}
func newInsufficientResourceError(resourceName string, requested, used, capacity int64) *InsufficientResourceError {
func NewInsufficientResourceError(resourceName api.ResourceName, requested, used, capacity int64) *InsufficientResourceError {
return &InsufficientResourceError{
ResourceName: resourceName,
requested: requested,
@ -69,14 +65,34 @@ func (e *InsufficientResourceError) Error() string {
e.ResourceName, e.requested, e.used, e.capacity)
}
func (e *InsufficientResourceError) GetReason() string {
return fmt.Sprintf("Insufficient %v", e.ResourceName)
}
type PredicateFailureError struct {
PredicateName string
}
func newPredicateFailureError(predicateName string) *PredicateFailureError {
return &PredicateFailureError{predicateName}
return &PredicateFailureError{PredicateName: predicateName}
}
func (e *PredicateFailureError) Error() string {
return fmt.Sprintf("Predicate %s failed", e.PredicateName)
}
func (e *PredicateFailureError) GetReason() string {
return e.PredicateName
}
type FailureReason struct {
reason string
}
func NewFailureReason(msg string) *FailureReason {
return &FailureReason{reason: msg}
}
func (e *FailureReason) GetReason() string {
return e.reason
}

View File

@ -141,15 +141,15 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image.
// TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func NoDiskConflict(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
for _, v := range pod.Spec.Volumes {
for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) {
return false, ErrDiskConflict
return false, []algorithm.PredicateFailureReason{ErrDiskConflict}, nil
}
}
}
return true, nil
return true, nil, nil
}
type MaxPDVolumeCountChecker struct {
@ -238,28 +238,28 @@ func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []api.Volume, namespace
return nil
}
func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return true, nil
return true, nil, nil
}
newVolumes := make(map[string]bool)
if err := c.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
return false, err
return false, nil, err
}
// quick return
if len(newVolumes) == 0 {
return true, nil
return true, nil, nil
}
// count unique volumes
existingVolumes := make(map[string]bool)
for _, existingPod := range nodeInfo.Pods() {
if err := c.filterVolumes(existingPod.Spec.Volumes, existingPod.Namespace, existingVolumes); err != nil {
return false, err
return false, nil, err
}
}
numExistingVolumes := len(existingVolumes)
@ -275,10 +275,10 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, meta interface{}, node
if numExistingVolumes+numNewVolumes > c.maxVolumes {
// violates MaxEBSVolumeCount or MaxGCEPDVolumeCount
return false, ErrMaxVolumeCountExceeded
return false, []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
}
return true, nil
return true, nil, nil
}
// EBSVolumeFilter is a VolumeFilter for filtering AWS ElasticBlockStore Volumes
@ -342,16 +342,16 @@ func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolum
return c.predicate
}
func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return true, nil
return true, nil, nil
}
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
nodeConstraints := make(map[string]string)
@ -366,40 +366,39 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *
// The node has no zone constraints, so we're OK to schedule.
// In practice, when using zones, all nodes must be labeled with zone labels.
// We want to fast-path this case though.
return true, nil
return true, nil, nil
}
namespace := pod.Namespace
manifest := &(pod.Spec)
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim != nil {
pvcName := volume.PersistentVolumeClaim.ClaimName
if pvcName == "" {
return false, fmt.Errorf("PersistentVolumeClaim had no name")
return false, nil, fmt.Errorf("PersistentVolumeClaim had no name")
}
pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
if err != nil {
return false, err
return false, nil, err
}
if pvc == nil {
return false, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
return false, fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName)
return false, nil, fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName)
}
pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
if err != nil {
return false, err
return false, nil, err
}
if pv == nil {
return false, fmt.Errorf("PersistentVolume not found: %q", pvName)
return false, nil, fmt.Errorf("PersistentVolume not found: %q", pvName)
}
for k, v := range pv.ObjectMeta.Labels {
@ -409,13 +408,13 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *
nodeV, _ := nodeConstraints[k]
if v != nodeV {
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, node.Name, pvName, k)
return false, ErrVolumeZoneConflict
return false, []algorithm.PredicateFailureReason{ErrVolumeZoneConflict}, nil
}
}
}
}
return true, nil
return true, nil, nil
}
func getResourceRequest(pod *api.Pod) *schedulercache.Resource {
@ -443,15 +442,16 @@ func podName(pod *api.Pod) string {
return pod.Namespace + "/" + pod.Name
}
func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
var predicateFails []algorithm.PredicateFailureReason
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1 > allowedPodNumber {
return false,
newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber))
predicateFails = append(predicateFails, NewInsufficientResourceError(api.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
}
var podRequest *schedulercache.Resource
@ -462,21 +462,18 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
podRequest = getResourceRequest(pod)
}
if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 {
return true, nil
return len(predicateFails) == 0, predicateFails, nil
}
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
return false,
newInsufficientResourceError(cpuResourceName, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU)
predicateFails = append(predicateFails, NewInsufficientResourceError(api.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
}
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
return false,
newInsufficientResourceError(memoryResourceName, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory)
predicateFails = append(predicateFails, NewInsufficientResourceError(api.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
}
if allocatable.NvidiaGPU < podRequest.NvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU {
return false,
newInsufficientResourceError(nvidiaGpuResourceName, podRequest.NvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU)
predicateFails = append(predicateFails, NewInsufficientResourceError(api.ResourceNvidiaGPU, podRequest.NvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU))
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
@ -484,11 +481,11 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
glog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
}
return true, nil
return len(predicateFails) == 0, predicateFails, nil
}
// nodeMatchesNodeSelectorTerms checks if a node's labels satisfy a list of node selector terms,
// terms are ORed, and an emtpy a list of terms will match nothing.
// terms are ORed, and an empty list of terms will match nothing.
func nodeMatchesNodeSelectorTerms(node *api.Node, nodeSelectorTerms []api.NodeSelectorTerm) bool {
for _, req := range nodeSelectorTerms {
nodeSelector, err := api.NodeSelectorRequirementsAsSelector(req.MatchExpressions)
@ -556,29 +553,29 @@ func podMatchesNodeLabels(pod *api.Pod, node *api.Node) bool {
return nodeAffinityMatches
}
func PodSelectorMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func PodSelectorMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
if podMatchesNodeLabels(pod, node) {
return true, nil
return true, nil, nil
}
return false, ErrNodeSelectorNotMatch
return false, []algorithm.PredicateFailureReason{ErrNodeSelectorNotMatch}, nil
}
func PodFitsHost(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func PodFitsHost(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
if len(pod.Spec.NodeName) == 0 {
return true, nil
return true, nil, nil
}
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
if pod.Spec.NodeName == node.Name {
return true, nil
return true, nil, nil
}
return false, ErrPodNotMatchHostName
return false, []algorithm.PredicateFailureReason{ErrPodNotMatchHostName}, nil
}
type NodeLabelChecker struct {
@ -606,10 +603,10 @@ func NewNodeLabelPredicate(labels []string, presence bool) algorithm.FitPredicat
// Alternately, eliminating nodes that have a certain label, regardless of value, is also useful
// A node may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this node
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
var exists bool
@ -617,10 +614,10 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, meta interface{}
for _, label := range n.labels {
exists = nodeLabels.Has(label)
if (exists && !n.presence) || (!exists && n.presence) {
return false, ErrNodeLabelPresenceViolated
return false, []algorithm.PredicateFailureReason{ErrNodeLabelPresenceViolated}, nil
}
}
return true, nil
return true, nil, nil
}
type ServiceAffinity struct {
@ -649,10 +646,10 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
// - L is listed in the ServiceAffinity object that is passed into the function
// - the pod does not have any NodeSelector for L
// - some other pod from the same service is already scheduled onto a node that has value V for label L
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
var affinitySelector labels.Selector
@ -679,7 +676,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
selector := labels.SelectorFromSet(services[0].Spec.Selector)
servicePods, err := s.podLister.List(selector)
if err != nil {
return false, err
return false, nil, err
}
// consider only the pods that belong to the same namespace
nsServicePods := []*api.Pod{}
@ -692,7 +689,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
// consider any service pod and fetch the node its hosted on
otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName)
if err != nil {
return false, err
return false, nil, err
}
for _, l := range s.labels {
// If the pod being scheduled has the label value specified, do not override it
@ -716,12 +713,12 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
// check if the node matches the selector
if affinitySelector.Matches(labels.Set(node.Labels)) {
return true, nil
return true, nil, nil
}
return false, ErrServiceAffinityViolated
return false, []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}, nil
}
func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var wantPorts map[int]bool
if predicateMeta, ok := meta.(*predicateMetadata); ok {
wantPorts = predicateMeta.podPorts
@ -730,17 +727,17 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
wantPorts = getUsedPorts(pod)
}
if len(wantPorts) == 0 {
return true, nil
return true, nil, nil
}
// TODO: Aggregate it at the NodeInfo level.
existingPorts := getUsedPorts(nodeInfo.Pods()...)
for wport := range wantPorts {
if wport != 0 && existingPorts[wport] {
return false, ErrPodNotFitsHostPorts
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
}
}
return true, nil
return true, nil, nil
}
func getUsedPorts(pods ...*api.Pod) map[int]bool {
@ -773,25 +770,41 @@ func haveSame(a1, a2 []string) bool {
return false
}
func GeneralPredicates(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
fit, err := PodFitsResources(pod, meta, nodeInfo)
func GeneralPredicates(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var predicateFails []algorithm.PredicateFailureReason
fit, reasons, err := PodFitsResources(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
return fit, err
predicateFails = append(predicateFails, reasons...)
}
fit, err = PodFitsHost(pod, meta, nodeInfo)
if !fit {
return fit, err
fit, reasons, err = PodFitsHost(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
fit, err = PodFitsHostPorts(pod, meta, nodeInfo)
if !fit {
return fit, err
predicateFails = append(predicateFails, reasons...)
}
fit, reasons, err = PodFitsHostPorts(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
fit, err = PodSelectorMatches(pod, meta, nodeInfo)
if !fit {
return fit, err
predicateFails = append(predicateFails, reasons...)
}
return true, nil
fit, reasons, err = PodSelectorMatches(pod, meta, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
return len(predicateFails) == 0, predicateFails, nil
}
type PodAffinityChecker struct {
@ -809,25 +822,25 @@ func NewPodAffinityPredicate(info NodeInfo, podLister algorithm.PodLister, failu
return checker.InterPodAffinityMatches
}
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
if !c.satisfiesExistingPodsAntiAffinity(pod, meta, node) {
return false, ErrPodAffinityNotMatch
return false, []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, nil
}
// Now check if <pod> requirements will be satisfied on this node.
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return false, err
return false, nil, err
}
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return true, nil
return true, nil, nil
}
if !c.satisfiesPodsAffinityAntiAffinity(pod, node, affinity) {
return false, ErrPodAffinityNotMatch
return false, []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, nil
}
if glog.V(10) {
@ -836,7 +849,7 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta interfac
glog.Infof("Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied",
podName(pod), node.Name)
}
return true, nil
return true, nil, nil
}
// AnyPodMatchesPodAffinityTerm checks if any of given pods can match the specific podAffinityTerm.
@ -1054,26 +1067,26 @@ func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *api.Pod, nod
return true
}
func PodToleratesNodeTaints(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func PodToleratesNodeTaints(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
taints, err := api.GetTaintsFromNodeAnnotations(node.Annotations)
if err != nil {
return false, err
return false, nil, err
}
tolerations, err := api.GetTolerationsFromPodAnnotations(pod.Annotations)
if err != nil {
return false, err
return false, nil, err
}
if tolerationsToleratesTaints(tolerations, taints) {
return true, nil
return true, nil, nil
}
return false, ErrTaintsTolerationsNotMatch
return false, []algorithm.PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil
}
func tolerationsToleratesTaints(tolerations []api.Toleration, taints []api.Taint) bool {
@ -1109,10 +1122,10 @@ func isPodBestEffort(pod *api.Pod) bool {
// CheckNodeMemoryPressurePredicate checks if a pod can be scheduled on a node
// reporting memory pressure condition.
func CheckNodeMemoryPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func CheckNodeMemoryPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
var podBestEffort bool
@ -1125,33 +1138,33 @@ func CheckNodeMemoryPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *
// pod is not BestEffort pod
if !podBestEffort {
return true, nil
return true, nil, nil
}
// is node under presure?
for _, cond := range node.Status.Conditions {
if cond.Type == api.NodeMemoryPressure && cond.Status == api.ConditionTrue {
return false, ErrNodeUnderMemoryPressure
return false, []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil
}
}
return true, nil
return true, nil, nil
}
// CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node
// reporting disk pressure condition.
func CheckNodeDiskPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func CheckNodeDiskPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
// is node under presure?
for _, cond := range node.Status.Conditions {
if cond.Type == api.NodeDiskPressure && cond.Status == api.ConditionTrue {
return false, ErrNodeUnderDiskPressure
return false, []algorithm.PredicateFailureReason{ErrNodeUnderDiskPressure}, nil
}
}
return true, nil
return true, nil, nil
}

View File

@ -125,7 +125,7 @@ func TestPodFitsResources(t *testing.T) {
nodeInfo *schedulercache.NodeInfo
fits bool
test string
wErr error
reasons []algorithm.PredicateFailureReason
}{
{
pod: &api.Pod{},
@ -133,7 +133,6 @@ func TestPodFitsResources(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 10, Memory: 20})),
fits: true,
test: "no resources requested always fits",
wErr: nil,
},
{
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}),
@ -141,39 +140,42 @@ func TestPodFitsResources(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 10, Memory: 20})),
fits: false,
test: "too many resources fails",
wErr: newInsufficientResourceError(cpuResourceName, 1, 10, 10),
reasons: []algorithm.PredicateFailureReason{
NewInsufficientResourceError(api.ResourceCPU, 1, 10, 10),
NewInsufficientResourceError(api.ResourceMemory, 1, 20, 20),
},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}), schedulercache.Resource{MilliCPU: 3, Memory: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 8, Memory: 19})),
fits: false,
test: "too many resources fails due to init container cpu",
wErr: newInsufficientResourceError(cpuResourceName, 3, 8, 10),
fits: false,
test: "too many resources fails due to init container cpu",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceCPU, 3, 8, 10)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}), schedulercache.Resource{MilliCPU: 3, Memory: 1}, schedulercache.Resource{MilliCPU: 2, Memory: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 8, Memory: 19})),
fits: false,
test: "too many resources fails due to highest init container cpu",
wErr: newInsufficientResourceError(cpuResourceName, 3, 8, 10),
fits: false,
test: "too many resources fails due to highest init container cpu",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceCPU, 3, 8, 10)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}), schedulercache.Resource{MilliCPU: 1, Memory: 3}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})),
fits: false,
test: "too many resources fails due to init container memory",
wErr: newInsufficientResourceError(memoryResourceName, 3, 19, 20),
fits: false,
test: "too many resources fails due to init container memory",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceMemory, 3, 19, 20)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}), schedulercache.Resource{MilliCPU: 1, Memory: 3}, schedulercache.Resource{MilliCPU: 1, Memory: 2}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})),
fits: false,
test: "too many resources fails due to highest init container memory",
wErr: newInsufficientResourceError(memoryResourceName, 3, 19, 20),
fits: false,
test: "too many resources fails due to highest init container memory",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceMemory, 3, 19, 20)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}), schedulercache.Resource{MilliCPU: 1, Memory: 1}),
@ -181,7 +183,6 @@ func TestPodFitsResources(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})),
fits: true,
test: "init container fits because it's the max, not sum, of containers and init containers",
wErr: nil,
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}), schedulercache.Resource{MilliCPU: 1, Memory: 1}, schedulercache.Resource{MilliCPU: 1, Memory: 1}),
@ -189,7 +190,6 @@ func TestPodFitsResources(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})),
fits: true,
test: "multiple init containers fit because it's the max, not sum, of containers and init containers",
wErr: nil,
},
{
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}),
@ -197,23 +197,22 @@ func TestPodFitsResources(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 5})),
fits: true,
test: "both resources fit",
wErr: nil,
},
{
pod: newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 5})),
fits: false,
test: "one resource memory fits",
wErr: newInsufficientResourceError(cpuResourceName, 2, 9, 10),
fits: false,
test: "one resource memory fits",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceCPU, 2, 9, 10)},
},
{
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 2}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})),
fits: false,
test: "one resource cpu fits",
wErr: newInsufficientResourceError(memoryResourceName, 2, 19, 20),
fits: false,
test: "one resource cpu fits",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceMemory, 2, 19, 20)},
},
{
pod: newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 1}),
@ -221,7 +220,6 @@ func TestPodFitsResources(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})),
fits: true,
test: "equal edge case",
wErr: nil,
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{MilliCPU: 4, Memory: 1}), schedulercache.Resource{MilliCPU: 5, Memory: 1}),
@ -229,7 +227,6 @@ func TestPodFitsResources(t *testing.T) {
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})),
fits: true,
test: "equal edge case for init container",
wErr: nil,
},
}
@ -237,9 +234,12 @@ func TestPodFitsResources(t *testing.T) {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}}
test.nodeInfo.SetNode(&node)
fits, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, test.reasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, test.reasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
@ -251,48 +251,51 @@ func TestPodFitsResources(t *testing.T) {
nodeInfo *schedulercache.NodeInfo
fits bool
test string
wErr error
reasons []algorithm.PredicateFailureReason
}{
{
pod: &api.Pod{},
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 10, Memory: 20})),
fits: false,
test: "even without specified resources predicate fails when there's no space for additional pod",
wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1),
fits: false,
test: "even without specified resources predicate fails when there's no space for additional pod",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourcePods, 1, 1, 1)},
},
{
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 5})),
fits: false,
test: "even if both resources fit predicate fails when there's no space for additional pod",
wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1),
fits: false,
test: "even if both resources fit predicate fails when there's no space for additional pod",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourcePods, 1, 1, 1)},
},
{
pod: newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})),
fits: false,
test: "even for equal edge case predicate fails when there's no space for additional pod",
wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1),
fits: false,
test: "even for equal edge case predicate fails when there's no space for additional pod",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourcePods, 1, 1, 1)},
},
{
pod: newResourceInitPod(newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 1}), schedulercache.Resource{MilliCPU: 5, Memory: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})),
fits: false,
test: "even for equal edge case predicate fails when there's no space for additional pod due to init container",
wErr: newInsufficientResourceError(podCountResourceName, 1, 1, 1),
fits: false,
test: "even for equal edge case predicate fails when there's no space for additional pod due to init container",
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourcePods, 1, 1, 1)},
},
}
for _, test := range notEnoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}}
test.nodeInfo.SetNode(&node)
fits, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, test.reasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, test.reasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
@ -342,19 +345,20 @@ func TestPodFitsHost(t *testing.T) {
test: "host doesn't match",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrPodNotMatchHostName}
for _, test := range tests {
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node)
result, err := PodFitsHost(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if !reflect.DeepEqual(err, ErrPodNotMatchHostName) && err != nil {
t.Errorf("unexpected error: %v", err)
fits, reasons, err := PodFitsHost(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if result == false && !reflect.DeepEqual(err, ErrPodNotMatchHostName) {
t.Errorf("unexpected error: %v", err)
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if result != test.fits {
t.Errorf("unexpected difference for %s: expected: %v got %v", test.test, test.fits, result)
if fits != test.fits {
t.Errorf("%s: unexpected difference: expected: %v got %v", test.test, test.fits, fits)
}
}
}
@ -418,13 +422,15 @@ func TestPodFitsHostPorts(t *testing.T) {
test: "second port",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}
for _, test := range tests {
fits, err := PodFitsHostPorts(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) && err != nil {
t.Errorf("unexpected error: %v", err)
fits, reasons, err := PodFitsHostPorts(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if fits == false && !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) {
t.Errorf("unexpected error: %v", err)
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if test.fits != fits {
t.Errorf("%s: expected %v, saw %v", test.test, test.fits, fits)
@ -463,7 +469,7 @@ func TestGetUsedPorts(t *testing.T) {
for _, test := range tests {
ports := getUsedPorts(test.pods...)
if !reflect.DeepEqual(test.ports, ports) {
t.Errorf("expect %v, got %v", test.ports, ports)
t.Errorf("%s: expected %v, got %v", "test get used ports", test.ports, ports)
}
}
}
@ -502,20 +508,21 @@ func TestDiskConflicts(t *testing.T) {
{&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"},
{&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrDiskConflict}
for _, test := range tests {
ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Errorf("unexpected error: %v", err)
ok, reasons, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) {
t.Errorf("unexpected error: %v", err)
if !ok && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
t.Errorf("%s: expected ok, got none. %v %s %s", test.test, test.pod, test.nodeInfo, test.test)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test)
t.Errorf("%s: expected no ok, got one. %v %s %s", test.test, test.pod, test.nodeInfo, test.test)
}
}
}
@ -554,20 +561,21 @@ func TestAWSDiskConflicts(t *testing.T) {
{&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"},
{&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrDiskConflict}
for _, test := range tests {
ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Errorf("unexpected error: %v", err)
ok, reasons, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) {
t.Errorf("unexpected error: %v", err)
if !ok && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
t.Errorf("%s: expected ok, got none. %v %s %s", test.test, test.pod, test.nodeInfo, test.test)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test)
t.Errorf("%s: expected no ok, got one. %v %s %s", test.test, test.pod, test.nodeInfo, test.test)
}
}
}
@ -612,20 +620,21 @@ func TestRBDDiskConflicts(t *testing.T) {
{&api.Pod{Spec: volState}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), false, "same state"},
{&api.Pod{Spec: volState2}, schedulercache.NewNodeInfo(&api.Pod{Spec: volState}), true, "different state"},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrDiskConflict}
for _, test := range tests {
ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Errorf("unexpected error: %v", err)
ok, reasons, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) {
t.Errorf("unexpected error: %v", err)
if !ok && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
t.Errorf("%s: expected ok, got none. %v %s %s", test.test, test.pod, test.nodeInfo, test.test)
}
if !test.isOk && ok {
t.Errorf("expected no ok, got one. %v %s %s", test.pod, test.nodeInfo, test.test)
t.Errorf("%s: expected no ok, got one. %v %s %s", test.test, test.pod, test.nodeInfo, test.test)
}
}
}
@ -1087,18 +1096,19 @@ func TestPodFitsSelector(t *testing.T) {
"is not satisfied, won't schedule onto the node",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrNodeSelectorNotMatch}
for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}}
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node)
fits, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil {
t.Errorf("unexpected error: %v", err)
fits, reasons, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if fits == false && !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) {
t.Errorf("unexpected error: %v", err)
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
@ -1152,18 +1162,20 @@ func TestNodeLabelPresence(t *testing.T) {
test: "all labels match, presence false",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrNodeLabelPresenceViolated}
for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}}
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node)
labelChecker := NodeLabelChecker{test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil {
t.Errorf("unexpected error: %v", err)
fits, reasons, err := labelChecker.CheckNodeLabelPresence(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if fits == false && !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) {
t.Errorf("unexpected error: %v", err)
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
@ -1297,18 +1309,19 @@ func TestServiceAffinity(t *testing.T) {
test: "service pod on different node, multiple labels, all match",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}
for _, test := range tests {
nodes := []api.Node{node1, node2, node3, node4, node5}
serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels}
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node)
fits, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if !reflect.DeepEqual(err, ErrServiceAffinityViolated) && err != nil {
t.Errorf("unexpected error: %v", err)
fits, reasons, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if fits == false && !reflect.DeepEqual(err, ErrServiceAffinityViolated) {
t.Errorf("unexpected error: %v", err)
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
@ -1581,14 +1594,17 @@ func TestEBSVolumeCountConflicts(t *testing.T) {
return "", false
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}
for _, test := range tests {
pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo)
fits, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), schedulercache.NewNodeInfo(test.existingPods...))
if err != nil && !reflect.DeepEqual(err, ErrMaxVolumeCountExceeded) {
t.Errorf("unexpected error: %v", err)
fits, reasons, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), schedulercache.NewNodeInfo(test.existingPods...))
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected %v, got %v", test.test, test.fits, fits)
}
@ -1647,7 +1663,7 @@ func TestPredicatesRegistered(t *testing.T) {
// Check if all public predicates are referenced in target files.
for _, function := range functions {
// Ignore functions that doesn't match FitPredicate signature.
// Ignore functions that don't match FitPredicate signature.
signature := function.Underlying.Signature
if len(predSignature.Parameters) != len(signature.Parameters) {
continue
@ -1697,6 +1713,7 @@ func TestRunGeneralPredicates(t *testing.T) {
fits bool
test string
wErr error
reasons []algorithm.PredicateFailureReason
}{
{
pod: &api.Pod{},
@ -1719,8 +1736,12 @@ func TestRunGeneralPredicates(t *testing.T) {
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)},
},
fits: false,
wErr: newInsufficientResourceError("CPU", 8, 5, 10),
test: "not enough cpu resource",
wErr: nil,
reasons: []algorithm.PredicateFailureReason{
NewInsufficientResourceError(api.ResourceCPU, 8, 5, 10),
NewInsufficientResourceError(api.ResourceMemory, 10, 19, 20),
},
test: "not enough cpu and memory resource",
},
{
pod: &api.Pod{},
@ -1735,10 +1756,11 @@ func TestRunGeneralPredicates(t *testing.T) {
pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}),
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 1})),
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32)}},
fits: false,
wErr: newInsufficientResourceError("NvidiaGpu", 1, 1, 1),
test: "not enough GPU resource",
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 1, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32)}},
fits: false,
wErr: nil,
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(api.ResourceNvidiaGPU, 1, 1, 1)},
test: "not enough GPU resource",
},
{
pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}),
@ -1760,9 +1782,10 @@ func TestRunGeneralPredicates(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "machine1"},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)},
},
fits: false,
wErr: ErrPodNotMatchHostName,
test: "host not match",
fits: false,
wErr: nil,
reasons: []algorithm.PredicateFailureReason{ErrPodNotMatchHostName},
test: "host not match",
},
{
pod: newPodWithPort(123),
@ -1771,16 +1794,20 @@ func TestRunGeneralPredicates(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "machine1"},
Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)},
},
fits: false,
wErr: ErrPodNotFitsHostPorts,
test: "hostport conflict",
fits: false,
wErr: nil,
reasons: []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts},
test: "hostport conflict",
},
}
for _, test := range resourceTests {
test.nodeInfo.SetNode(test.node)
fits, err := GeneralPredicates(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
fits, reasons, err := GeneralPredicates(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, test.reasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, test.reasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
@ -2310,6 +2337,8 @@ func TestInterPodAffinity(t *testing.T) {
test: "verify that PodAntiAffinity from existing pod is respected when pod has no AntiAffinity constraints. satisfy PodAntiAffinity symmetry with the existing pod",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}
for _, test := range tests {
node := test.node
var podsOnNode []*api.Pod
@ -2327,10 +2356,13 @@ func TestInterPodAffinity(t *testing.T) {
nodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(test.node)
nodeInfoMap := map[string]*schedulercache.NodeInfo{test.node.Name: nodeInfo}
fits, err := fit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil {
fits, reasons, err := fit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected %v got %v", test.test, test.fits, fits)
}
@ -2476,6 +2508,9 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
"should not be blocked from being scheduled onto any node, even there's no existing pod that match the rule anywhere.",
},
}
affinityExpectedFailureReasons := []algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}
selectorExpectedFailureReasons := []algorithm.PredicateFailureReason{ErrNodeSelectorNotMatch}
for _, test := range tests {
nodeListInfo := FakeNodeListInfo(test.nodes)
for _, node := range test.nodes {
@ -2494,21 +2529,27 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
nodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(&node)
nodeInfoMap := map[string]*schedulercache.NodeInfo{node.Name: nodeInfo}
fits, err := testFit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil {
fits, reasons, err := testFit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, affinityExpectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v", test.test, reasons)
}
affinity, err := api.GetAffinityFromPodAnnotations(test.pod.ObjectMeta.Annotations)
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if affinity != nil && affinity.NodeAffinity != nil {
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node)
nodeInfoMap := map[string]*schedulercache.NodeInfo{node.Name: nodeInfo}
fits2, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil {
t.Errorf("unexpected error: %v", err)
fits2, reasons, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod, nodeInfoMap), nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if !fits2 && !reflect.DeepEqual(reasons, selectorExpectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, selectorExpectedFailureReasons)
}
fits = fits && fits2
}
@ -2788,14 +2829,18 @@ func TestPodToleratesTaints(t *testing.T) {
"but the effect of taint on node is PreferNochedule. Pod can be shceduled onto the node",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrTaintsTolerationsNotMatch}
for _, test := range podTolerateTaintsTests {
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&test.node)
fits, err := PodToleratesNodeTaints(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if fits == false && !reflect.DeepEqual(err, ErrTaintsTolerationsNotMatch) {
fits, reasons, err := PodToleratesNodeTaints(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if err != nil {
t.Errorf("%s, unexpected error: %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s, unexpected failure reason: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s, expected: %v got %v", test.test, test.fits, fits)
}
@ -2896,17 +2941,19 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) {
name: "non best-effort pod schedulable on node without memory pressure condition on",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}
for _, test := range tests {
fits, err := CheckNodeMemoryPressurePredicate(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
fits, reasons, err := CheckNodeMemoryPressurePredicate(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.name, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.name, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected %v got %v", test.name, test.fits, fits)
}
if err != nil && err != ErrNodeUnderMemoryPressure {
t.Errorf("%s: unexpected error: %v", test.name, err)
continue
}
}
}
@ -2966,15 +3013,18 @@ func TestPodSchedulesOnNodeWithDiskPressureCondition(t *testing.T) {
name: "pod not schedulable on node with pressure condition on",
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrNodeUnderDiskPressure}
for _, test := range tests {
fits, err := CheckNodeDiskPressurePredicate(test.pod, nil, test.nodeInfo)
fits, reasons, err := CheckNodeDiskPressurePredicate(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.name, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.name, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected %v got %v", test.name, test.fits, fits)
}
if err != nil && err != ErrNodeUnderDiskPressure {
t.Errorf("%s: unexpected error: %v", test.name, err)
}
}
}

View File

@ -24,7 +24,7 @@ import (
// FitPredicate is a function that indicates if a pod fits into an existing node.
// The failure information is given by the error.
type FitPredicate func(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error)
type FitPredicate func(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error)
@ -32,3 +32,7 @@ type PriorityConfig struct {
Function PriorityFunction
Weight int
}
type PredicateFailureReason interface {
GetReason() string
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -115,12 +116,12 @@ func TestCreateFromEmptyConfig(t *testing.T) {
factory.CreateFromConfig(policy)
}
func PredicateOne(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return true, nil
func PredicateOne(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}
func PredicateTwo(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return true, nil
func PredicateTwo(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}
func PriorityOne(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {

View File

@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -35,7 +36,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type FailedPredicateMap map[string]string
type FailedPredicateMap map[string][]algorithm.PredicateFailureReason
type FitError struct {
Pod *api.Pod
@ -48,9 +49,13 @@ var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
func (f *FitError) Error() string {
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("pod (%s) failed to fit in any node\n", f.Pod.Name))
for node, predicate := range f.FailedPredicates {
reason := fmt.Sprintf("fit failure on node (%s): %s\n", node, predicate)
buf.WriteString(reason)
for node, predicates := range f.FailedPredicates {
reasons := make([]string, 0)
for _, pred := range predicates {
reasons = append(reasons, pred.GetReason())
}
reasonMsg := fmt.Sprintf("fit failure on node (%s): %s\n", node, strings.Join(reasons, ", "))
buf.WriteString(reasonMsg)
}
return buf.String()
}
@ -159,7 +164,7 @@ func findNodesThatFit(
var filteredLen int32
checkNode := func(i int) {
nodeName := nodes[i].Name
fits, failedPredicate, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
if err != nil {
predicateResultLock.Lock()
errs = append(errs, err)
@ -170,7 +175,7 @@ func findNodesThatFit(
filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
} else {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicate
failedPredicateMap[nodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
@ -189,7 +194,10 @@ func findNodesThatFit(
}
for failedNodeName, failedMsg := range failedMap {
failedPredicateMap[failedNodeName] = failedMsg
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
@ -201,38 +209,19 @@ func findNodesThatFit(
}
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
func podFitsOnNode(pod *api.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, string, error) {
func podFitsOnNode(pod *api.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) {
var failedPredicates []algorithm.PredicateFailureReason
for _, predicate := range predicateFuncs {
fit, err := predicate(pod, meta, info)
fit, reasons, err := predicate(pod, meta, info)
if err != nil {
switch e := err.(type) {
case *predicates.InsufficientResourceError:
if fit {
err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e)
return false, "", err
}
case *predicates.PredicateFailureError:
if fit {
err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e)
return false, "", err
}
default:
return false, "", err
}
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
return false, []algorithm.PredicateFailureReason{}, err
}
if !fit {
if re, ok := err.(*predicates.InsufficientResourceError); ok {
return false, fmt.Sprintf("Insufficient %s", re.ResourceName), nil
}
if re, ok := err.(*predicates.PredicateFailureError); ok {
return false, re.PredicateName, nil
} else {
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
return false, "", err
}
failedPredicates = append(failedPredicates, reasons...)
}
}
return true, "", nil
return len(failedPredicates) == 0, failedPredicates, nil
}
// Prioritizes the nodes by running the individual priority functions in parallel.

View File

@ -33,30 +33,30 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
func falsePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return false, algorithmpredicates.ErrFakePredicate
func falsePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
func truePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return true, nil
func truePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return true, nil, nil
}
func matchesPredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func matchesPredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
return false, nil, fmt.Errorf("node not found")
}
if pod.Name == node.Name {
return true, nil
return true, nil, nil
}
return false, algorithmpredicates.ErrFakePredicate
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
func hasNoPodsPredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func hasNoPodsPredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
if len(nodeInfo.Pods()) == 0 {
return true, nil
return true, nil, nil
}
return false, algorithmpredicates.ErrFakePredicate
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
func numericPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
@ -193,8 +193,8 @@ func TestGenericScheduler(t *testing.T) {
wErr: &FitError{
Pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
FailedPredicates: FailedPredicateMap{
"machine1": algorithmpredicates.ErrFakePredicate.PredicateName,
"machine2": algorithmpredicates.ErrFakePredicate.PredicateName,
"machine1": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
"machine2": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
}},
},
{
@ -251,9 +251,9 @@ func TestGenericScheduler(t *testing.T) {
wErr: &FitError{
Pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
FailedPredicates: FailedPredicateMap{
"3": algorithmpredicates.ErrFakePredicate.PredicateName,
"2": algorithmpredicates.ErrFakePredicate.PredicateName,
"1": algorithmpredicates.ErrFakePredicate.PredicateName,
"3": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
"2": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
"1": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
},
},
},
@ -282,8 +282,8 @@ func TestGenericScheduler(t *testing.T) {
wErr: &FitError{
Pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
FailedPredicates: FailedPredicateMap{
"1": algorithmpredicates.ErrFakePredicate.PredicateName,
"2": algorithmpredicates.ErrFakePredicate.PredicateName,
"1": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
"2": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
},
},
},
@ -327,12 +327,12 @@ func TestFindFitAllError(t *testing.T) {
}
for _, node := range nodes {
failure, found := predicateMap[node]
failures, found := predicateMap[node]
if !found {
t.Errorf("failed to find node: %s in %v", node, predicateMap)
}
if failure != "FakePredicateError" {
t.Errorf("unexpected failures: %v", failure)
if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate {
t.Errorf("unexpected failures: %v", failures)
}
}
}
@ -351,7 +351,7 @@ func TestFindFitSomeError(t *testing.T) {
}
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil)
if err != nil && !reflect.DeepEqual(err, algorithmpredicates.ErrFakePredicate) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -363,12 +363,12 @@ func TestFindFitSomeError(t *testing.T) {
if node == pod.Name {
continue
}
failure, found := predicateMap[node]
failures, found := predicateMap[node]
if !found {
t.Errorf("failed to find node: %s in %v", node, predicateMap)
}
if failure != "FakePredicateError" {
t.Errorf("unexpected failures: %v", failure)
if len(failures) != 1 || failures[0] != algorithmpredicates.ErrFakePredicate {
t.Errorf("unexpected failures: %v", failures)
}
}
}

View File

@ -23,6 +23,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
clientcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
@ -64,6 +65,14 @@ func podWithPort(id, desiredHost string, port int) *api.Pod {
return pod
}
func podWithResources(id, desiredHost string, limits api.ResourceList, requests api.ResourceList) *api.Pod {
pod := podWithID(id, desiredHost)
pod.Spec.Containers = []api.Container{
{Name: "ctr", Resources: api.ResourceRequirements{Limits: limits, Requests: requests}},
}
return pod
}
type mockScheduler struct {
machine string
err error
@ -78,6 +87,7 @@ func TestScheduler(t *testing.T) {
eventBroadcaster.StartLogging(t.Logf).Stop()
errS := errors.New("scheduler")
errB := errors.New("binder")
testNode := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}}
table := []struct {
injectBindError error
@ -91,21 +101,21 @@ func TestScheduler(t *testing.T) {
}{
{
sendPod: podWithID("foo", ""),
algo: mockScheduler{"machine1", nil},
expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}},
expectAssumedPod: podWithID("foo", "machine1"),
algo: mockScheduler{testNode.Name, nil},
expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: testNode.Name}},
expectAssumedPod: podWithID("foo", testNode.Name),
eventReason: "Scheduled",
}, {
sendPod: podWithID("foo", ""),
algo: mockScheduler{"machine1", errS},
algo: mockScheduler{testNode.Name, errS},
expectError: errS,
expectErrorPod: podWithID("foo", ""),
eventReason: "FailedScheduling",
}, {
sendPod: podWithID("foo", ""),
algo: mockScheduler{"machine1", nil},
expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}},
expectAssumedPod: podWithID("foo", "machine1"),
algo: mockScheduler{testNode.Name, nil},
expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: testNode.Name}},
expectAssumedPod: podWithID("foo", testNode.Name),
injectBindError: errB,
expectError: errB,
expectErrorPod: podWithID("foo", ""),
@ -125,7 +135,7 @@ func TestScheduler(t *testing.T) {
},
},
NodeLister: algorithm.FakeNodeLister(
[]*api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}},
[]*api.Node{&testNode},
),
Algorithm: item.algo,
Binder: fakeBinder{func(b *api.Binding) error {
@ -174,7 +184,10 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(100*time.Millisecond, stop)
pod := podWithPort("pod.Name", "", 8080)
scheduler, bindingChan, _ := setupTestSchedulerWithOnePod(t, queuedPodStore, scache, pod)
node := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}}
nodeLister := algorithm.FakeNodeLister([]*api.Node{&node})
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, pod, &node)
waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{})
@ -212,7 +225,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
case b := <-bindingChan:
expectBinding := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
Target: api.ObjectReference{Kind: "Node", Name: node.Name},
}
if !reflect.DeepEqual(expectBinding, b) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
@ -228,7 +241,10 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(10*time.Minute, stop)
firstPod := podWithPort("pod.Name", "", 8080)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePod(t, queuedPodStore, scache, firstPod)
node := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}}
nodeLister := algorithm.FakeNodeLister([]*api.Node{&node})
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, firstPod, &node)
// We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080)
@ -241,7 +257,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
case err := <-errChan:
expectErr := &FitError{
Pod: secondPod,
FailedPredicates: FailedPredicateMap{"machine1": "PodFitsHostPorts"},
FailedPredicates: FailedPredicateMap{node.Name: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}},
}
if !reflect.DeepEqual(expectErr, err) {
t.Errorf("err want=%v, get=%v", expectErr, err)
@ -254,7 +270,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
// Note: if the schedulercache timeout would be super short, the first pod would expire
// and would be removed itself (without any explicit actions on schedulercache). Even in that case,
// explicitly AddPod will as well correct the behavior.
firstPod.Spec.NodeName = "machine1"
firstPod.Spec.NodeName = node.Name
if err := scache.AddPod(firstPod); err != nil {
t.Fatalf("err: %v", err)
}
@ -268,7 +284,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
case b := <-bindingChan:
expectBinding := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
Target: api.ObjectReference{Kind: "Node", Name: node.Name},
}
if !reflect.DeepEqual(expectBinding, b) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
@ -280,21 +296,102 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePod(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, pod *api.Pod) (*Scheduler, chan *api.Binding, chan error) {
// Create the scheduler config
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache,
nodeLister algorithm.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *api.Pod, node *api.Node) (*Scheduler, chan *api.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap)
queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080]
// cache: []
scheduler.scheduleOne()
// queuedPodStore: []
// cache: [(assumed)foo:8080]
select {
case b := <-bindingChan:
expectBinding := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: pod.Name},
Target: api.ObjectReference{Kind: "Node", Name: node.Name},
}
if !reflect.DeepEqual(expectBinding, b) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
return scheduler, bindingChan, errChan
}
func TestSchedulerFailedSchedulingReasons(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(10*time.Minute, stop)
node := api.Node{
ObjectMeta: api.ObjectMeta{Name: "machine1"},
Status: api.NodeStatus{
Capacity: api.ResourceList{
api.ResourceCPU: *(resource.NewQuantity(2, resource.DecimalSI)),
api.ResourceMemory: *(resource.NewQuantity(100, resource.DecimalSI)),
api.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *(resource.NewQuantity(2, resource.DecimalSI)),
api.ResourceMemory: *(resource.NewQuantity(100, resource.DecimalSI)),
api.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
}},
}
scache.AddNode(&node)
nodeLister := algorithm.FakeNodeLister([]*api.Node{&node})
predicateMap := map[string]algorithm.FitPredicate{
"PodFitsResources": predicates.PodFitsResources,
}
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap)
podWithTooBigResourceRequests := podWithResources("bar", "", api.ResourceList{
api.ResourceCPU: *(resource.NewQuantity(4, resource.DecimalSI)),
api.ResourceMemory: *(resource.NewQuantity(500, resource.DecimalSI)),
}, api.ResourceList{
api.ResourceCPU: *(resource.NewQuantity(4, resource.DecimalSI)),
api.ResourceMemory: *(resource.NewQuantity(500, resource.DecimalSI)),
})
queuedPodStore.Add(podWithTooBigResourceRequests)
scheduler.scheduleOne()
select {
case err := <-errChan:
expectErr := &FitError{
Pod: podWithTooBigResourceRequests,
FailedPredicates: FailedPredicateMap{node.Name: []algorithm.PredicateFailureReason{
predicates.NewInsufficientResourceError(api.ResourceCPU, 4000, 0, 2000),
predicates.NewInsufficientResourceError(api.ResourceMemory, 500, 0, 100),
}},
}
if !reflect.DeepEqual(expectErr, err) {
t.Errorf("err want=%+v, get=%+v", expectErr, err)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
}
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister algorithm.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *api.Binding, chan error) {
algo := NewGenericScheduler(
scache,
map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts},
predicateMap,
[]algorithm.PriorityConfig{},
[]algorithm.SchedulerExtender{})
bindingChan := make(chan *api.Binding, 1)
errChan := make(chan error, 1)
cfg := &Config{
SchedulerCache: scache,
NodeLister: algorithm.FakeNodeLister(
[]*api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}},
),
Algorithm: algo,
NodeLister: nodeLister,
Algorithm: algo,
Binder: fakeBinder{func(b *api.Binding) error {
bindingChan <- b
return nil
@ -308,27 +405,5 @@ func setupTestSchedulerWithOnePod(t *testing.T, queuedPodStore *clientcache.FIFO
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
}
scheduler := New(cfg)
queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080]
// cache: []
scheduler.scheduleOne()
// queuedPodStore: []
// cache: [(assumed)foo:8080]
select {
case b := <-bindingChan:
expectBinding := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "pod.Name"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
}
if !reflect.DeepEqual(expectBinding, b) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
return scheduler, bindingChan, errChan
return New(cfg), bindingChan, errChan
}