diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e0cce769f38..22a6dee9f50 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -88,6 +88,7 @@ import ( "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/third_party/golang/expansion" ) @@ -2323,32 +2324,63 @@ func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) { Message: "Pod " + message}) } +// getNodeAnyWay() must return a *api.Node which is required by RunGeneralPredicates(). +// The *api.Node is obtained as follows: +// Return kubelet's nodeInfo for this node, except on error or if in standalone mode, +// in which case return a manufactured nodeInfo representing a node with no pods, +// zero capacity, and the default labels. +func (kl *Kubelet) getNodeAnyWay() (*api.Node, error) { + if !kl.standaloneMode { + if n, err := kl.nodeInfo.GetNodeInfo(kl.nodeName); err == nil { + return n, nil + } + } + return kl.initialNodeStatus() +} + // canAdmitPod determines if a pod can be admitted, and gives a reason if it // cannot. "pod" is new pod, while "pods" include all admitted pods plus the // new pod. The function returns a boolean value indicating whether the pod // can be admitted, a brief single-word reason and a message explaining why // the pod cannot be admitted. -// -// This needs to be kept in sync with the scheduler's and daemonset's fit predicates, -// otherwise there will inevitably be pod delete create loops. This will be fixed -// once we can extract these predicates into a common library. (#12744) func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) { - if hasHostPortConflicts(pods) { - return false, "HostPortConflict", "cannot start the pod due to host port conflict." + node, err := kl.getNodeAnyWay() + if err != nil { + glog.Errorf("Cannot get Node info: %v", err) + return false, "InvalidNodeInfo", "Kubelet cannot get node info." } - if !kl.matchesNodeSelector(pod) { - return false, "NodeSelectorMismatching", "cannot be started due to node selector mismatch" + otherPods := []*api.Pod{} + for _, p := range pods { + if p != pod { + otherPods = append(otherPods, p) + } } - cpu, memory := kl.hasInsufficientfFreeResources(pods) - if cpu { - return false, "InsufficientFreeCPU", "cannot start the pod due to insufficient free CPU." - } else if memory { - return false, "InsufficientFreeMemory", "cannot be started due to insufficient free memory" + nodeInfo := schedulercache.CreateNodeNameToInfoMap(otherPods)[kl.nodeName] + fit, err := predicates.RunGeneralPredicates(pod, kl.nodeName, nodeInfo, node) + if !fit { + if re, ok := err.(*predicates.PredicateFailureError); ok { + reason := re.PredicateName + message := re.Error() + glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message) + return fit, reason, message + } + if re, ok := err.(*predicates.InsufficientResourceError); ok { + reason := fmt.Sprintf("OutOf%s", re.ResourceName) + message := re.Error() + glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message) + return fit, reason, message + } + reason := "UnexpectedPredicateFailureType" + message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err) + glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message) + return fit, reason, message } + // TODO: When disk space scheduling is implemented (#11976), remove the out-of-disk check here and + // add the disk space predicate to predicates.GeneralPredicates. if kl.isOutOfDisk() { + glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to isOutOfDisk") return false, "OutOfDisk", "cannot be started due to lack of disk space." } - return true, "", "" } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index aa5ce3c75f9..9d215acb0e1 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2365,7 +2365,14 @@ func TestHandlePortConflicts(t *testing.T) { testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) - spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}} + kl.nodeLister = testNodeLister{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: kl.nodeName}}, + }} + kl.nodeInfo = testNodeInfo{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: kl.nodeName}}, + }} + + spec := api.PodSpec{NodeName: kl.nodeName, Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}} pods := []*api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -2388,17 +2395,90 @@ func TestHandlePortConflicts(t *testing.T) { pods[1].CreationTimestamp = unversioned.NewTime(time.Now()) pods[0].CreationTimestamp = unversioned.NewTime(time.Now().Add(1 * time.Second)) // The newer pod should be rejected. - conflictedPod := pods[0] + notfittingPod := pods[0] + fittingPod := pods[1] kl.HandlePodAdditions(pods) // Check pod status stored in the status map. - status, found := kl.statusManager.GetPodStatus(conflictedPod.UID) + // notfittingPod should be Failed + status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { - t.Fatalf("status of pod %q is not found in the status map", conflictedPod.UID) + t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) } if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) } + // fittingPod should be Pending + status, found = kl.statusManager.GetPodStatus(fittingPod.UID) + if !found { + t.Fatalf("status of pod %q is not found in the status map", fittingPod.UID) + } + if status.Phase != api.PodPending { + t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase) + } +} + +// Tests that we handle host name conflicts correctly by setting the failed status in status map. +func TestHandleHostNameConflicts(t *testing.T) { + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) + testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) + testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) + + kl.nodeLister = testNodeLister{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}}, + }} + kl.nodeInfo = testNodeInfo{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}}, + }} + + pods := []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "123456789", + Name: "notfittingpod", + Namespace: "foo", + }, + Spec: api.PodSpec{ + // default NodeName in test is 127.0.0.1 + NodeName: "127.0.0.2", + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "987654321", + Name: "fittingpod", + Namespace: "foo", + }, + Spec: api.PodSpec{ + // default NodeName in test is 127.0.0.1 + NodeName: "127.0.0.1", + }, + }, + } + + notfittingPod := pods[0] + fittingPod := pods[1] + + kl.HandlePodAdditions(pods) + // Check pod status stored in the status map. + // notfittingPod should be Failed + status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) + if !found { + t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } + // fittingPod should be Pending + status, found = kl.statusManager.GetPodStatus(fittingPod.UID) + if !found { + t.Fatalf("status of pod %q is not found in the status map", fittingPod.UID) + } + if status.Phase != api.PodPending { + t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase) + } } // Tests that we handle not matching labels selector correctly by setting the failed status in status map. @@ -2434,9 +2514,11 @@ func TestHandleNodeSelector(t *testing.T) { } // The first pod should be rejected. notfittingPod := pods[0] + fittingPod := pods[1] kl.HandlePodAdditions(pods) // Check pod status stored in the status map. + // notfittingPod should be Failed status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) @@ -2444,21 +2526,46 @@ func TestHandleNodeSelector(t *testing.T) { if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) } + // fittingPod should be Pending + status, found = kl.statusManager.GetPodStatus(fittingPod.UID) + if !found { + t.Fatalf("status of pod %q is not found in the status map", fittingPod.UID) + } + if status.Phase != api.PodPending { + t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase) + } } // Tests that we handle exceeded resources correctly by setting the failed status in status map. func TestHandleMemExceeded(t *testing.T) { testKubelet := newTestKubelet(t) kl := testKubelet.kubelet - testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{MemoryCapacity: 100}, nil) + kl.nodeLister = testNodeLister{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI), + }}}, + }} + kl.nodeInfo = testNodeInfo{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}, + Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI), + api.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI), + }}}, + }} + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) - spec := api.PodSpec{Containers: []api.Container{{Resources: api.ResourceRequirements{ - Requests: api.ResourceList{ - "memory": resource.MustParse("90"), - }, - }}}} + spec := api.PodSpec{NodeName: kl.nodeName, + Containers: []api.Container{{Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + "memory": resource.MustParse("90"), + }, + }}}} pods := []*api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -2482,9 +2589,11 @@ func TestHandleMemExceeded(t *testing.T) { pods[0].CreationTimestamp = unversioned.NewTime(time.Now().Add(1 * time.Second)) // The newer pod should be rejected. notfittingPod := pods[0] + fittingPod := pods[1] kl.HandlePodAdditions(pods) // Check pod status stored in the status map. + // notfittingPod should be Failed status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) if !found { t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) @@ -2492,6 +2601,14 @@ func TestHandleMemExceeded(t *testing.T) { if status.Phase != api.PodFailed { t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) } + // fittingPod should be Pending + status, found = kl.statusManager.GetPodStatus(fittingPod.UID) + if !found { + t.Fatalf("status of pod %q is not found in the status map", fittingPod.UID) + } + if status.Phase != api.PodPending { + t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase) + } } // TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal. @@ -2500,6 +2617,12 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) + versionInfo := &cadvisorapi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + testKubelet.fakeCadvisor.On("VersionInfo").Return(versionInfo, nil) kl := testKubelet.kubelet pods := []*api.Pod{ diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 39c1dc874f2..a999d5d839d 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -24,6 +24,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapiv2 "github.com/google/cadvisor/info/v2" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/record" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" @@ -73,6 +74,7 @@ func TestRunOnce(t *testing.T) { containerRuntime: fakeRuntime, reasonCache: NewReasonCache(), clock: util.RealClock{}, + kubeClient: &fake.Clientset{}, } kb.containerManager = cm.NewStubContainerManager() diff --git a/plugin/pkg/scheduler/algorithm/predicates/error.go b/plugin/pkg/scheduler/algorithm/predicates/error.go index b8d93689220..d746bd11a1e 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/error.go +++ b/plugin/pkg/scheduler/algorithm/predicates/error.go @@ -24,11 +24,27 @@ const ( memoryResoureceName string = "Memory" ) +var ( + // The predicateName tries to be consistent as the predicate name used in DefaultAlgorithmProvider defined in + // defaults.go (which tend to be stable for backward compatibility) + ErrDiskConflict = newPredicateFailureError("NoDiskConflict") + ErrVolumeZoneConflict = newPredicateFailureError("NoVolumeZoneConflict") + ErrNodeSelectorNotMatch = newPredicateFailureError("MatchNodeSelector") + ErrPodNotMatchHostName = newPredicateFailureError("HostName") + ErrPodNotFitsHostPorts = newPredicateFailureError("PodFitsHostPorts") + ErrNodeLabelPresenceViolated = newPredicateFailureError("CheckNodeLabelPresence") + ErrServiceAffinityViolated = newPredicateFailureError("CheckServiceAffinity") + ErrMaxVolumeCountExceeded = newPredicateFailureError("MaxVolumeCount") + // ErrFakePredicateError is used for test only. The fake predicates returning false also returns error + // as ErrFakePredicateError. + ErrFakePredicateError = newPredicateFailureError("false") +) + // InsufficientResourceError is an error type that indicates what kind of resource limit is // hit and caused the unfitting failure. type InsufficientResourceError struct { // resourceName is the name of the resource that is insufficient - resourceName string + ResourceName string requested int64 used int64 capacity int64 @@ -36,7 +52,7 @@ type InsufficientResourceError struct { func newInsufficientResourceError(resourceName string, requested, used, capacity int64) *InsufficientResourceError { return &InsufficientResourceError{ - resourceName: resourceName, + ResourceName: resourceName, requested: requested, used: used, capacity: capacity, @@ -45,5 +61,17 @@ func newInsufficientResourceError(resourceName string, requested, used, capacity func (e *InsufficientResourceError) Error() string { return fmt.Sprintf("Node didn't have enough resource: %s, requested: %d, used: %d, capacity: %d", - e.resourceName, e.requested, e.used, e.capacity) + e.ResourceName, e.requested, e.used, e.capacity) +} + +type PredicateFailureError struct { + PredicateName string +} + +func newPredicateFailureError(predicateName string) *PredicateFailureError { + return &PredicateFailureError{predicateName} +} + +func (e *PredicateFailureError) Error() string { + return fmt.Sprintf("Predicate %s failed", e.PredicateName) } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 8ded8c303b0..7c8530e3be3 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -127,7 +127,7 @@ func NoDiskConflict(pod *api.Pod, nodeName string, nodeInfo *schedulercache.Node for _, v := range pod.Spec.Volumes { for _, ev := range nodeInfo.Pods() { if isVolumeConflict(v, ev) { - return false, nil + return false, ErrDiskConflict } } } @@ -229,7 +229,8 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, nodeName string, nodeI numNewVolumes := len(newVolumes) if numExistingVolumes+numNewVolumes > c.maxVolumes { - return false, nil + // violates MaxEBSVolumeCount or MaxGCEPDVolumeCount + return false, ErrMaxVolumeCountExceeded } return true, nil @@ -362,7 +363,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *s nodeV, _ := nodeConstraints[k] if v != nodeV { glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeName, pvName, k) - return false, nil + return false, ErrVolumeZoneConflict } } } @@ -421,20 +422,9 @@ func podName(pod *api.Pod) string { return pod.Namespace + "/" + pod.Name } -// PodFitsResources calculates fit based on requested, rather than used resources -func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - info, err := r.info.GetNodeInfo(nodeName) - if err != nil { - return false, err - } - +func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, info *api.Node) (bool, error) { allocatable := info.Status.Allocatable allowedPodNumber := allocatable.Pods().Value() - if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber { - return false, - newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), allowedPodNumber) - } - podRequest := getResourceRequest(pod) if podRequest.milliCPU == 0 && podRequest.memory == 0 { return true, nil @@ -442,6 +432,7 @@ func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo * totalMilliCPU := allocatable.Cpu().MilliValue() totalMemory := allocatable.Memory().Value() + if totalMilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU { return false, newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, totalMilliCPU) @@ -455,15 +446,30 @@ func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo * return true, nil } +func (r *NodeStatus) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + info, err := r.info.GetNodeInfo(nodeName) + if err != nil { + return false, err + } + // TODO: move the following podNumber check to podFitsResourcesInternal when Kubelet allows podNumber check (See #20263). + allocatable := info.Status.Allocatable + allowedPodNumber := allocatable.Pods().Value() + if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber { + return false, + newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), allowedPodNumber) + } + return podFitsResourcesInternal(pod, nodeName, nodeInfo, info) +} + func NewResourceFitPredicate(info NodeInfo) algorithm.FitPredicate { - fit := &ResourceFit{ + fit := &NodeStatus{ info: info, } return fit.PodFitsResources } func NewSelectorMatchPredicate(info NodeInfo) algorithm.FitPredicate { - selector := &NodeSelector{ + selector := &NodeStatus{ info: info, } return selector.PodSelectorMatches @@ -542,19 +548,25 @@ type NodeSelector struct { info NodeInfo } -func (n *NodeSelector) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func (n *NodeStatus) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { node, err := n.info.GetNodeInfo(nodeName) if err != nil { return false, err } - return PodMatchesNodeLabels(pod, node), nil + if PodMatchesNodeLabels(pod, node) { + return true, nil + } + return false, ErrNodeSelectorNotMatch } func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { if len(pod.Spec.NodeName) == 0 { return true, nil } - return pod.Spec.NodeName == nodeName, nil + if pod.Spec.NodeName == nodeName { + return true, nil + } + return false, ErrPodNotMatchHostName } type NodeLabelChecker struct { @@ -594,7 +606,7 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeName string, for _, label := range n.labels { exists = nodeLabels.Has(label) if (exists && !n.presence) || (!exists && n.presence) { - return false, nil + return false, ErrNodeLabelPresenceViolated } } return true, nil @@ -692,7 +704,10 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, nodeName string, no } // check if the node matches the selector - return affinitySelector.Matches(labels.Set(node.Labels)), nil + if affinitySelector.Matches(labels.Set(node.Labels)) { + return true, nil + } + return false, ErrServiceAffinityViolated } func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { @@ -706,7 +721,7 @@ func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.No continue } if existingPorts[wport] { - return false, nil + return false, ErrPodNotFitsHostPorts } } return true, nil @@ -735,3 +750,41 @@ func haveSame(a1, a2 []string) bool { } return false } + +type NodeStatus struct { + info NodeInfo +} + +func GeneralPredicates(info NodeInfo) algorithm.FitPredicate { + node := &NodeStatus{ + info: info, + } + return node.SchedulerGeneralPredicates +} + +func (n *NodeStatus) SchedulerGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { + node, err := n.info.GetNodeInfo(nodeName) + if err != nil { + return false, err + } + return RunGeneralPredicates(pod, nodeName, nodeInfo, node) +} + +func RunGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, node *api.Node) (bool, error) { + fit, err := podFitsResourcesInternal(pod, nodeName, nodeInfo, node) + if !fit { + return fit, err + } + fit, err = PodFitsHost(pod, nodeName, nodeInfo) + if !fit { + return fit, err + } + fit, err = PodFitsHostPorts(pod, nodeName, nodeInfo) + if !fit { + return fit, err + } + if !PodMatchesNodeLabels(pod, node) { + return false, ErrNodeSelectorNotMatch + } + return true, nil +} diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index be4a8e87f36..b0fcf6472ca 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -159,7 +159,7 @@ func TestPodFitsResources(t *testing.T) { for _, test := range enoughPodsTests { node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}} - fit := ResourceFit{FakeNodeInfo(node)} + fit := NodeStatus{FakeNodeInfo(node)} fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) @@ -204,7 +204,7 @@ func TestPodFitsResources(t *testing.T) { for _, test := range notEnoughPodsTests { node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1)}} - fit := ResourceFit{FakeNodeInfo(node)} + fit := NodeStatus{FakeNodeInfo(node)} fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) @@ -252,11 +252,14 @@ func TestPodFitsHost(t *testing.T) { for _, test := range tests { result, err := PodFitsHost(test.pod, test.node, schedulercache.NewNodeInfo()) - if err != nil { + if !reflect.DeepEqual(err, ErrPodNotMatchHostName) && err != nil { + t.Errorf("unexpected error: %v", err) + } + if result == false && !reflect.DeepEqual(err, ErrPodNotMatchHostName) { t.Errorf("unexpected error: %v", err) } if result != test.fits { - t.Errorf("unexpected difference for %s: got: %v expected %v", test.test, test.fits, result) + t.Errorf("unexpected difference for %s: expected: %v got %v", test.test, test.fits, result) } } } @@ -322,7 +325,10 @@ func TestPodFitsHostPorts(t *testing.T) { } for _, test := range tests { fits, err := PodFitsHostPorts(test.pod, "machine", test.nodeInfo) - if err != nil { + if !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) && err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits == false && !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) { t.Errorf("unexpected error: %v", err) } if test.fits != fits { @@ -404,8 +410,11 @@ func TestDiskConflicts(t *testing.T) { for _, test := range tests { ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) - if err != nil { - t.Fatalf("unexpected error: %v", err) + if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { + t.Errorf("unexpected error: %v", err) + } + if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) { + t.Errorf("unexpected error: %v", err) } if test.isOk && !ok { t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) @@ -453,8 +462,11 @@ func TestAWSDiskConflicts(t *testing.T) { for _, test := range tests { ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) - if err != nil { - t.Fatalf("unexpected error: %v", err) + if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { + t.Errorf("unexpected error: %v", err) + } + if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) { + t.Errorf("unexpected error: %v", err) } if test.isOk && !ok { t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) @@ -508,8 +520,11 @@ func TestRBDDiskConflicts(t *testing.T) { for _, test := range tests { ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) - if err != nil { - t.Fatalf("unexpected error: %v", err) + if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { + t.Errorf("unexpected error: %v", err) + } + if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) { + t.Errorf("unexpected error: %v", err) } if test.isOk && !ok { t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) @@ -980,9 +995,12 @@ func TestPodFitsSelector(t *testing.T) { for _, test := range tests { node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}} - fit := NodeSelector{FakeNodeInfo(node)} + fit := NodeStatus{FakeNodeInfo(node)} fits, err := fit.PodSelectorMatches(test.pod, "machine", schedulercache.NewNodeInfo()) - if err != nil { + if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits == false && !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) { t.Errorf("unexpected error: %v", err) } if fits != test.fits { @@ -1041,7 +1059,10 @@ func TestNodeLabelPresence(t *testing.T) { node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}} labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence} fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", schedulercache.NewNodeInfo()) - if err != nil { + if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits == false && !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) { t.Errorf("unexpected error: %v", err) } if fits != test.fits { @@ -1181,7 +1202,10 @@ func TestServiceAffinity(t *testing.T) { nodes := []api.Node{node1, node2, node3, node4, node5} serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} fits, err := serviceAffinity.CheckServiceAffinity(test.pod, test.node, schedulercache.NewNodeInfo()) - if err != nil { + if !reflect.DeepEqual(err, ErrServiceAffinityViolated) && err != nil { + t.Errorf("unexpected error: %v", err) + } + if fits == false && !reflect.DeepEqual(err, ErrServiceAffinityViolated) { t.Errorf("unexpected error: %v", err) } if fits != test.fits { @@ -1401,7 +1425,7 @@ func TestEBSVolumeCountConflicts(t *testing.T) { for _, test := range tests { pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo) fits, err := pred(test.newPod, "some-node", schedulercache.NewNodeInfo(test.existingPods...)) - if err != nil { + if err != nil && !reflect.DeepEqual(err, ErrMaxVolumeCountExceeded) { t.Errorf("unexpected error: %v", err) } @@ -1455,3 +1479,83 @@ func TestPredicatesRegistered(t *testing.T) { } } } + +func newPodWithPort(hostPorts ...int) *api.Pod { + networkPorts := []api.ContainerPort{} + for _, port := range hostPorts { + networkPorts = append(networkPorts, api.ContainerPort{HostPort: port}) + } + return &api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Ports: networkPorts, + }, + }, + }, + } +} + +func TestRunGeneralPredicates(t *testing.T) { + resourceTests := []struct { + pod *api.Pod + nodeName string + nodeInfo *schedulercache.NodeInfo + node *api.Node + fits bool + test string + wErr error + }{ + { + pod: &api.Pod{}, + nodeName: "machine1", + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 9, memory: 19})), + node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}}, + fits: true, + wErr: nil, + test: "no resources/port/host requested always fits", + }, + { + pod: newResourcePod(resourceRequest{milliCPU: 8, memory: 10}), + nodeName: "machine1", + nodeInfo: schedulercache.NewNodeInfo( + newResourcePod(resourceRequest{milliCPU: 5, memory: 19})), + node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}}, + fits: false, + wErr: newInsufficientResourceError("CPU", 8, 5, 10), + test: "not enough cpu resource", + }, + { + pod: &api.Pod{ + Spec: api.PodSpec{ + NodeName: "machine2", + }, + }, + nodeName: "machine1", + nodeInfo: schedulercache.NewNodeInfo(), + node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}}, + fits: false, + wErr: ErrPodNotMatchHostName, + test: "host not match", + }, + { + pod: newPodWithPort(123), + nodeName: "machine1", + nodeInfo: schedulercache.NewNodeInfo(newPodWithPort(123)), + node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}}, + fits: false, + wErr: ErrPodNotFitsHostPorts, + test: "hostport conflict", + }, + } + for _, test := range resourceTests { + fits, err := RunGeneralPredicates(test.pod, test.nodeName, test.nodeInfo, test.node) + if !reflect.DeepEqual(err, test.wErr) { + t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) + } + if fits != test.fits { + t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits) + } + } +} diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index ffce2c71583..186a1cbfd32 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -23,6 +23,7 @@ import ( ) // FitPredicate is a function that indicates if a pod fits into an existing node. +// The failure information is given by the error. type FitPredicate func(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister NodeLister) (schedulerapi.HostPriorityList, error) diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 6242a2ba9a8..7f6d05780ff 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -77,19 +77,27 @@ func init() { // of already-installed packages required by the pod will be preferred over nodes with no already-installed // packages required by the pod or a small total size of already-installed packages required by the pod. factory.RegisterPriorityFunction("ImageLocalityPriority", priorities.ImageLocalityPriority, 1) + // Fit is defined based on the absence of port conflicts. + // This predicate is actually a default predicate, because it is invoked from + // predicates.GeneralPredicates() + factory.RegisterFitPredicate("PodFitsHostPorts", predicates.PodFitsHostPorts) + // Fit is determined by resource availability. + // This predicate is actually a default predicate, because it is invoked from + // predicates.GeneralPredicates() + factory.RegisterFitPredicateFactory( + "PodFitsResources", + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + return predicates.NewResourceFitPredicate(args.NodeInfo) + }, + ) + // Fit is determined by the presence of the Host parameter and a string match + // This predicate is actually a default predicate, because it is invoked from + // predicates.GeneralPredicates() + factory.RegisterFitPredicate("HostName", predicates.PodFitsHost) } func defaultPredicates() sets.String { return sets.NewString( - // Fit is defined based on the absence of port conflicts. - factory.RegisterFitPredicate("PodFitsHostPorts", predicates.PodFitsHostPorts), - // Fit is determined by resource availability. - factory.RegisterFitPredicateFactory( - "PodFitsResources", - func(args factory.PluginFactoryArgs) algorithm.FitPredicate { - return predicates.NewResourceFitPredicate(args.NodeInfo) - }, - ), // Fit is determined by non-conflicting disk volumes. factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict), // Fit is determined by volume zone requirements. @@ -106,9 +114,6 @@ func defaultPredicates() sets.String { return predicates.NewSelectorMatchPredicate(args.NodeInfo) }, ), - // Fit is determined by the presence of the Host parameter and a string match - factory.RegisterFitPredicate("HostName", predicates.PodFitsHost), - // Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node factory.RegisterFitPredicateFactory( "MaxEBSVolumeCount", @@ -118,7 +123,6 @@ func defaultPredicates() sets.String { return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilter, maxVols, args.PVInfo, args.PVCInfo) }, ), - // Fit is determined by whether or not there would be too many GCE PD volumes attached to the node factory.RegisterFitPredicateFactory( "MaxGCEPDVolumeCount", @@ -128,6 +132,14 @@ func defaultPredicates() sets.String { return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilter, maxVols, args.PVInfo, args.PVCInfo) }, ), + // GeneralPredicates are the predicates that are enforced by all Kubernetes components + // (e.g. kubelet and all schedulers) + factory.RegisterFitPredicateFactory( + "GeneralPredicates", + func(args factory.PluginFactoryArgs) algorithm.FitPredicate { + return predicates.GeneralPredicates(args.NodeInfo) + }, + ), ) } diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index bda82048435..8240534ed20 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -84,6 +84,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe } filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders) + if err != nil { return "", err } @@ -130,7 +131,7 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No for _, node := range nodes.Items { fits := true - for name, predicate := range predicateFuncs { + for _, predicate := range predicateFuncs { fit, err := predicate(pod, node.Name, nodeNameToInfo[node.Name]) if err != nil { switch e := err.(type) { @@ -139,6 +140,11 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e) return api.NodeList{}, FailedPredicateMap{}, err } + case *predicates.PredicateFailureError: + if fit { + err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e) + return api.NodeList{}, FailedPredicateMap{}, err + } default: return api.NodeList{}, FailedPredicateMap{}, err } @@ -149,10 +155,16 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No failedPredicateMap[node.Name] = sets.String{} } if re, ok := err.(*predicates.InsufficientResourceError); ok { - failedPredicateMap[node.Name].Insert(re.Error()) + failedPredicateMap[node.Name].Insert(fmt.Sprintf("Insufficient %s", re.ResourceName)) break } - failedPredicateMap[node.Name].Insert(name) + if re, ok := err.(*predicates.PredicateFailureError); ok { + failedPredicateMap[node.Name].Insert(re.PredicateName) + break + } else { + err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err) + return api.NodeList{}, FailedPredicateMap{}, err + } break } } diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 222f3fac1b5..0bce1fedd32 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -20,19 +20,21 @@ import ( "fmt" "math" "math/rand" + "reflect" "strconv" "testing" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + algorithmpredicates "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" ) func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - return false, nil + return false, algorithmpredicates.ErrFakePredicateError } func truePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { @@ -40,11 +42,17 @@ func truePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeI } func matchesPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - return pod.Name == nodeName, nil + if pod.Name == nodeName { + return true, nil + } + return false, algorithmpredicates.ErrFakePredicateError } func hasNoPodsPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { - return len(nodeInfo.Pods()) == 0, nil + if len(nodeInfo.Pods()) == 0 { + return true, nil + } + return false, algorithmpredicates.ErrFakePredicateError } func numericPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { @@ -176,6 +184,7 @@ func TestGenericScheduler(t *testing.T) { pods []*api.Pod expectedHosts sets.String expectsErr bool + wErr error }{ { predicates: map[string]algorithm.FitPredicate{"false": falsePredicate}, @@ -183,6 +192,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"machine1", "machine2"}, expectsErr: true, name: "test 1", + wErr: algorithmpredicates.ErrFakePredicateError, }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, @@ -190,6 +200,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"machine1", "machine2"}, expectedHosts: sets.NewString("machine1", "machine2"), name: "test 2", + wErr: nil, }, { // Fits on a machine where the pod ID matches the machine name @@ -199,6 +210,7 @@ func TestGenericScheduler(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}}, expectedHosts: sets.NewString("machine2"), name: "test 3", + wErr: nil, }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, @@ -206,6 +218,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"3", "2", "1"}, expectedHosts: sets.NewString("3"), name: "test 4", + wErr: nil, }, { predicates: map[string]algorithm.FitPredicate{"matches": matchesPredicate}, @@ -214,6 +227,7 @@ func TestGenericScheduler(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, expectedHosts: sets.NewString("2"), name: "test 5", + wErr: nil, }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, @@ -222,6 +236,7 @@ func TestGenericScheduler(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, expectedHosts: sets.NewString("1"), name: "test 6", + wErr: nil, }, { predicates: map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate}, @@ -229,6 +244,7 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"3", "2", "1"}, expectsErr: true, name: "test 7", + wErr: nil, }, { predicates: map[string]algorithm.FitPredicate{ @@ -252,20 +268,20 @@ func TestGenericScheduler(t *testing.T) { nodes: []string{"1", "2"}, expectsErr: true, name: "test 8", + wErr: nil, }, } - for _, test := range tests { random := rand.New(rand.NewSource(0)) scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { if err == nil { - t.Error("Unexpected non-error") + t.Errorf("Unexpected non-error at %s", test.name) } } else { - if err != nil { - t.Errorf("Unexpected error: %v", err) + if !reflect.DeepEqual(err, test.wErr) { + t.Errorf("Failed : %s, Unexpected error: %v, expected: %v", test.name, err, test.wErr) } if !test.expectedHosts.Has(machine) { t.Errorf("Failed : %s, Expected: %s, Saw: %s", test.name, test.expectedHosts, machine) @@ -312,9 +328,9 @@ func TestFindFitSomeError(t *testing.T) { "2": schedulercache.NewNodeInfo(), "1": schedulercache.NewNodeInfo(pod), } - _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, predicates, makeNodeList(nodes), nil) - if err != nil { + _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, predicates, makeNodeList(nodes), nil) + if err != nil && !reflect.DeepEqual(err, algorithmpredicates.ErrFakePredicateError) { t.Errorf("unexpected error: %v", err) } @@ -330,7 +346,7 @@ func TestFindFitSomeError(t *testing.T) { if !found { t.Errorf("failed to find node: %s in %v", node, predicateMap) } - if len(failures) != 1 || !failures.Has("match") { + if len(failures) != 1 || !failures.Has("false") { t.Errorf("unexpected failures: %v", failures) } }