move predicates into library (address #12744)

DONE:
1. refactor all predicates: predicates return fitOrNot(bool) and error(Error) in which the latter is of type
	PredicateFailureError or InsufficientResourceError. (For violation of either MaxEBSVolumeCount or
        MaxGCEPDVolumeCount, returns one same error type as ErrMaxVolumeCountExceeded)
2. GeneralPredicates() is a predicate function, which includes serveral other predicate functions (PodFitsResource,
        PodFitsHost, PodFitsHostPort). It is registered as one of the predicates in DefaultAlgorithmProvider, and
        is also called in canAdmitPod() in Kubelet and should be called by other components (like rescheduler, etc)
        if necessary. See discussion in issue #12744
3. remove podNumber check from GeneralPredicates
4. HostName is now verified in Kubelet's canAdminPod(). add TestHostNameConflicts in kubelet_test.go
5. add getNodeAnyWay() method in Kubelet to get node information in standaloneMode

TODO:
1. determine which predicates should be included in GeneralPredicates()
2. separate GeneralPredicates() into:
	a. GeneralPredicatesEvictPod() and
	b. GeneralPredicatesNotEvictPod()
3. DaemonSet should use GeneralPredicates()
This commit is contained in:
HaiyangDING 2016-01-06 09:10:59 +08:00
parent e31177219b
commit 41ed85479a
10 changed files with 475 additions and 92 deletions

View File

@ -88,6 +88,7 @@ import (
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/third_party/golang/expansion" "k8s.io/kubernetes/third_party/golang/expansion"
) )
@ -2323,32 +2324,63 @@ func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) {
Message: "Pod " + message}) Message: "Pod " + message})
} }
// getNodeAnyWay() must return a *api.Node which is required by RunGeneralPredicates().
// The *api.Node is obtained as follows:
// Return kubelet's nodeInfo for this node, except on error or if in standalone mode,
// in which case return a manufactured nodeInfo representing a node with no pods,
// zero capacity, and the default labels.
func (kl *Kubelet) getNodeAnyWay() (*api.Node, error) {
if !kl.standaloneMode {
if n, err := kl.nodeInfo.GetNodeInfo(kl.nodeName); err == nil {
return n, nil
}
}
return kl.initialNodeStatus()
}
// canAdmitPod determines if a pod can be admitted, and gives a reason if it // canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" include all admitted pods plus the // cannot. "pod" is new pod, while "pods" include all admitted pods plus the
// new pod. The function returns a boolean value indicating whether the pod // new pod. The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why // can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted. // the pod cannot be admitted.
//
// This needs to be kept in sync with the scheduler's and daemonset's fit predicates,
// otherwise there will inevitably be pod delete create loops. This will be fixed
// once we can extract these predicates into a common library. (#12744)
func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) { func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) {
if hasHostPortConflicts(pods) { node, err := kl.getNodeAnyWay()
return false, "HostPortConflict", "cannot start the pod due to host port conflict." if err != nil {
glog.Errorf("Cannot get Node info: %v", err)
return false, "InvalidNodeInfo", "Kubelet cannot get node info."
} }
if !kl.matchesNodeSelector(pod) { otherPods := []*api.Pod{}
return false, "NodeSelectorMismatching", "cannot be started due to node selector mismatch" for _, p := range pods {
if p != pod {
otherPods = append(otherPods, p)
}
} }
cpu, memory := kl.hasInsufficientfFreeResources(pods) nodeInfo := schedulercache.CreateNodeNameToInfoMap(otherPods)[kl.nodeName]
if cpu { fit, err := predicates.RunGeneralPredicates(pod, kl.nodeName, nodeInfo, node)
return false, "InsufficientFreeCPU", "cannot start the pod due to insufficient free CPU." if !fit {
} else if memory { if re, ok := err.(*predicates.PredicateFailureError); ok {
return false, "InsufficientFreeMemory", "cannot be started due to insufficient free memory" reason := re.PredicateName
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
return fit, reason, message
}
if re, ok := err.(*predicates.InsufficientResourceError); ok {
reason := fmt.Sprintf("OutOf%s", re.ResourceName)
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
return fit, reason, message
}
reason := "UnexpectedPredicateFailureType"
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
return fit, reason, message
} }
// TODO: When disk space scheduling is implemented (#11976), remove the out-of-disk check here and
// add the disk space predicate to predicates.GeneralPredicates.
if kl.isOutOfDisk() { if kl.isOutOfDisk() {
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to isOutOfDisk")
return false, "OutOfDisk", "cannot be started due to lack of disk space." return false, "OutOfDisk", "cannot be started due to lack of disk space."
} }
return true, "", "" return true, "", ""
} }

View File

@ -2365,7 +2365,14 @@ func TestHandlePortConflicts(t *testing.T) {
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}} kl.nodeLister = testNodeLister{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: kl.nodeName}},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: kl.nodeName}},
}}
spec := api.PodSpec{NodeName: kl.nodeName, Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}
pods := []*api.Pod{ pods := []*api.Pod{
{ {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
@ -2388,17 +2395,90 @@ func TestHandlePortConflicts(t *testing.T) {
pods[1].CreationTimestamp = unversioned.NewTime(time.Now()) pods[1].CreationTimestamp = unversioned.NewTime(time.Now())
pods[0].CreationTimestamp = unversioned.NewTime(time.Now().Add(1 * time.Second)) pods[0].CreationTimestamp = unversioned.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected. // The newer pod should be rejected.
conflictedPod := pods[0] notfittingPod := pods[0]
fittingPod := pods[1]
kl.HandlePodAdditions(pods) kl.HandlePodAdditions(pods)
// Check pod status stored in the status map. // Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(conflictedPod.UID) // notfittingPod should be Failed
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found { if !found {
t.Fatalf("status of pod %q is not found in the status map", conflictedPod.UID) t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID)
} }
if status.Phase != api.PodFailed { if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
} }
// fittingPod should be Pending
status, found = kl.statusManager.GetPodStatus(fittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", fittingPod.UID)
}
if status.Phase != api.PodPending {
t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase)
}
}
// Tests that we handle host name conflicts correctly by setting the failed status in status map.
func TestHandleHostNameConflicts(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
kl.nodeLister = testNodeLister{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}},
}}
pods := []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
Name: "notfittingpod",
Namespace: "foo",
},
Spec: api.PodSpec{
// default NodeName in test is 127.0.0.1
NodeName: "127.0.0.2",
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "987654321",
Name: "fittingpod",
Namespace: "foo",
},
Spec: api.PodSpec{
// default NodeName in test is 127.0.0.1
NodeName: "127.0.0.1",
},
},
}
notfittingPod := pods[0]
fittingPod := pods[1]
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
// notfittingPod should be Failed
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
// fittingPod should be Pending
status, found = kl.statusManager.GetPodStatus(fittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", fittingPod.UID)
}
if status.Phase != api.PodPending {
t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase)
}
} }
// Tests that we handle not matching labels selector correctly by setting the failed status in status map. // Tests that we handle not matching labels selector correctly by setting the failed status in status map.
@ -2434,9 +2514,11 @@ func TestHandleNodeSelector(t *testing.T) {
} }
// The first pod should be rejected. // The first pod should be rejected.
notfittingPod := pods[0] notfittingPod := pods[0]
fittingPod := pods[1]
kl.HandlePodAdditions(pods) kl.HandlePodAdditions(pods)
// Check pod status stored in the status map. // Check pod status stored in the status map.
// notfittingPod should be Failed
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found { if !found {
t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID)
@ -2444,21 +2526,46 @@ func TestHandleNodeSelector(t *testing.T) {
if status.Phase != api.PodFailed { if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
} }
// fittingPod should be Pending
status, found = kl.statusManager.GetPodStatus(fittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", fittingPod.UID)
}
if status.Phase != api.PodPending {
t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase)
}
} }
// Tests that we handle exceeded resources correctly by setting the failed status in status map. // Tests that we handle exceeded resources correctly by setting the failed status in status map.
func TestHandleMemExceeded(t *testing.T) { func TestHandleMemExceeded(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet kl := testKubelet.kubelet
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{MemoryCapacity: 100}, nil) kl.nodeLister = testNodeLister{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI),
}}},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI),
}}},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
spec := api.PodSpec{Containers: []api.Container{{Resources: api.ResourceRequirements{ spec := api.PodSpec{NodeName: kl.nodeName,
Requests: api.ResourceList{ Containers: []api.Container{{Resources: api.ResourceRequirements{
"memory": resource.MustParse("90"), Requests: api.ResourceList{
}, "memory": resource.MustParse("90"),
}}}} },
}}}}
pods := []*api.Pod{ pods := []*api.Pod{
{ {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
@ -2482,9 +2589,11 @@ func TestHandleMemExceeded(t *testing.T) {
pods[0].CreationTimestamp = unversioned.NewTime(time.Now().Add(1 * time.Second)) pods[0].CreationTimestamp = unversioned.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected. // The newer pod should be rejected.
notfittingPod := pods[0] notfittingPod := pods[0]
fittingPod := pods[1]
kl.HandlePodAdditions(pods) kl.HandlePodAdditions(pods)
// Check pod status stored in the status map. // Check pod status stored in the status map.
// notfittingPod should be Failed
status, found := kl.statusManager.GetPodStatus(notfittingPod.UID) status, found := kl.statusManager.GetPodStatus(notfittingPod.UID)
if !found { if !found {
t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID) t.Fatalf("status of pod %q is not found in the status map", notfittingPod.UID)
@ -2492,6 +2601,14 @@ func TestHandleMemExceeded(t *testing.T) {
if status.Phase != api.PodFailed { if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
} }
// fittingPod should be Pending
status, found = kl.statusManager.GetPodStatus(fittingPod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", fittingPod.UID)
}
if status.Phase != api.PodPending {
t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase)
}
} }
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal. // TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
@ -2500,6 +2617,12 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
testKubelet.fakeCadvisor.On("VersionInfo").Return(versionInfo, nil)
kl := testKubelet.kubelet kl := testKubelet.kubelet
pods := []*api.Pod{ pods := []*api.Pod{

View File

@ -24,6 +24,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1" cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2" cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
@ -73,6 +74,7 @@ func TestRunOnce(t *testing.T) {
containerRuntime: fakeRuntime, containerRuntime: fakeRuntime,
reasonCache: NewReasonCache(), reasonCache: NewReasonCache(),
clock: util.RealClock{}, clock: util.RealClock{},
kubeClient: &fake.Clientset{},
} }
kb.containerManager = cm.NewStubContainerManager() kb.containerManager = cm.NewStubContainerManager()

View File

@ -24,11 +24,27 @@ const (
memoryResoureceName string = "Memory" memoryResoureceName string = "Memory"
) )
var (
// The predicateName tries to be consistent as the predicate name used in DefaultAlgorithmProvider defined in
// defaults.go (which tend to be stable for backward compatibility)
ErrDiskConflict = newPredicateFailureError("NoDiskConflict")
ErrVolumeZoneConflict = newPredicateFailureError("NoVolumeZoneConflict")
ErrNodeSelectorNotMatch = newPredicateFailureError("MatchNodeSelector")
ErrPodNotMatchHostName = newPredicateFailureError("HostName")
ErrPodNotFitsHostPorts = newPredicateFailureError("PodFitsHostPorts")
ErrNodeLabelPresenceViolated = newPredicateFailureError("CheckNodeLabelPresence")
ErrServiceAffinityViolated = newPredicateFailureError("CheckServiceAffinity")
ErrMaxVolumeCountExceeded = newPredicateFailureError("MaxVolumeCount")
// ErrFakePredicateError is used for test only. The fake predicates returning false also returns error
// as ErrFakePredicateError.
ErrFakePredicateError = newPredicateFailureError("false")
)
// InsufficientResourceError is an error type that indicates what kind of resource limit is // InsufficientResourceError is an error type that indicates what kind of resource limit is
// hit and caused the unfitting failure. // hit and caused the unfitting failure.
type InsufficientResourceError struct { type InsufficientResourceError struct {
// resourceName is the name of the resource that is insufficient // resourceName is the name of the resource that is insufficient
resourceName string ResourceName string
requested int64 requested int64
used int64 used int64
capacity int64 capacity int64
@ -36,7 +52,7 @@ type InsufficientResourceError struct {
func newInsufficientResourceError(resourceName string, requested, used, capacity int64) *InsufficientResourceError { func newInsufficientResourceError(resourceName string, requested, used, capacity int64) *InsufficientResourceError {
return &InsufficientResourceError{ return &InsufficientResourceError{
resourceName: resourceName, ResourceName: resourceName,
requested: requested, requested: requested,
used: used, used: used,
capacity: capacity, capacity: capacity,
@ -45,5 +61,17 @@ func newInsufficientResourceError(resourceName string, requested, used, capacity
func (e *InsufficientResourceError) Error() string { func (e *InsufficientResourceError) Error() string {
return fmt.Sprintf("Node didn't have enough resource: %s, requested: %d, used: %d, capacity: %d", return fmt.Sprintf("Node didn't have enough resource: %s, requested: %d, used: %d, capacity: %d",
e.resourceName, e.requested, e.used, e.capacity) e.ResourceName, e.requested, e.used, e.capacity)
}
type PredicateFailureError struct {
PredicateName string
}
func newPredicateFailureError(predicateName string) *PredicateFailureError {
return &PredicateFailureError{predicateName}
}
func (e *PredicateFailureError) Error() string {
return fmt.Sprintf("Predicate %s failed", e.PredicateName)
} }

View File

@ -127,7 +127,7 @@ func NoDiskConflict(pod *api.Pod, nodeName string, nodeInfo *schedulercache.Node
for _, v := range pod.Spec.Volumes { for _, v := range pod.Spec.Volumes {
for _, ev := range nodeInfo.Pods() { for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) { if isVolumeConflict(v, ev) {
return false, nil return false, ErrDiskConflict
} }
} }
} }
@ -229,7 +229,8 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, nodeName string, nodeI
numNewVolumes := len(newVolumes) numNewVolumes := len(newVolumes)
if numExistingVolumes+numNewVolumes > c.maxVolumes { if numExistingVolumes+numNewVolumes > c.maxVolumes {
return false, nil // violates MaxEBSVolumeCount or MaxGCEPDVolumeCount
return false, ErrMaxVolumeCountExceeded
} }
return true, nil return true, nil
@ -362,7 +363,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *s
nodeV, _ := nodeConstraints[k] nodeV, _ := nodeConstraints[k]
if v != nodeV { if v != nodeV {
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeName, pvName, k) glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeName, pvName, k)
return false, nil return false, ErrVolumeZoneConflict
} }
} }
} }
@ -421,20 +422,9 @@ func podName(pod *api.Pod) string {
return pod.Namespace + "/" + pod.Name return pod.Namespace + "/" + pod.Name
} }
// PodFitsResources calculates fit based on requested, rather than used resources func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, info *api.Node) (bool, error) {
func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
info, err := r.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
allocatable := info.Status.Allocatable allocatable := info.Status.Allocatable
allowedPodNumber := allocatable.Pods().Value() allowedPodNumber := allocatable.Pods().Value()
if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber {
return false,
newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), allowedPodNumber)
}
podRequest := getResourceRequest(pod) podRequest := getResourceRequest(pod)
if podRequest.milliCPU == 0 && podRequest.memory == 0 { if podRequest.milliCPU == 0 && podRequest.memory == 0 {
return true, nil return true, nil
@ -442,6 +432,7 @@ func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *
totalMilliCPU := allocatable.Cpu().MilliValue() totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value() totalMemory := allocatable.Memory().Value()
if totalMilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU { if totalMilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU {
return false, return false,
newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, totalMilliCPU) newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, totalMilliCPU)
@ -455,15 +446,30 @@ func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *
return true, nil return true, nil
} }
func (r *NodeStatus) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
info, err := r.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
// TODO: move the following podNumber check to podFitsResourcesInternal when Kubelet allows podNumber check (See #20263).
allocatable := info.Status.Allocatable
allowedPodNumber := allocatable.Pods().Value()
if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber {
return false,
newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), allowedPodNumber)
}
return podFitsResourcesInternal(pod, nodeName, nodeInfo, info)
}
func NewResourceFitPredicate(info NodeInfo) algorithm.FitPredicate { func NewResourceFitPredicate(info NodeInfo) algorithm.FitPredicate {
fit := &ResourceFit{ fit := &NodeStatus{
info: info, info: info,
} }
return fit.PodFitsResources return fit.PodFitsResources
} }
func NewSelectorMatchPredicate(info NodeInfo) algorithm.FitPredicate { func NewSelectorMatchPredicate(info NodeInfo) algorithm.FitPredicate {
selector := &NodeSelector{ selector := &NodeStatus{
info: info, info: info,
} }
return selector.PodSelectorMatches return selector.PodSelectorMatches
@ -542,19 +548,25 @@ type NodeSelector struct {
info NodeInfo info NodeInfo
} }
func (n *NodeSelector) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { func (n *NodeStatus) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := n.info.GetNodeInfo(nodeName) node, err := n.info.GetNodeInfo(nodeName)
if err != nil { if err != nil {
return false, err return false, err
} }
return PodMatchesNodeLabels(pod, node), nil if PodMatchesNodeLabels(pod, node) {
return true, nil
}
return false, ErrNodeSelectorNotMatch
} }
func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
if len(pod.Spec.NodeName) == 0 { if len(pod.Spec.NodeName) == 0 {
return true, nil return true, nil
} }
return pod.Spec.NodeName == nodeName, nil if pod.Spec.NodeName == nodeName {
return true, nil
}
return false, ErrPodNotMatchHostName
} }
type NodeLabelChecker struct { type NodeLabelChecker struct {
@ -594,7 +606,7 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeName string,
for _, label := range n.labels { for _, label := range n.labels {
exists = nodeLabels.Has(label) exists = nodeLabels.Has(label)
if (exists && !n.presence) || (!exists && n.presence) { if (exists && !n.presence) || (!exists && n.presence) {
return false, nil return false, ErrNodeLabelPresenceViolated
} }
} }
return true, nil return true, nil
@ -692,7 +704,10 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, nodeName string, no
} }
// check if the node matches the selector // check if the node matches the selector
return affinitySelector.Matches(labels.Set(node.Labels)), nil if affinitySelector.Matches(labels.Set(node.Labels)) {
return true, nil
}
return false, ErrServiceAffinityViolated
} }
func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
@ -706,7 +721,7 @@ func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.No
continue continue
} }
if existingPorts[wport] { if existingPorts[wport] {
return false, nil return false, ErrPodNotFitsHostPorts
} }
} }
return true, nil return true, nil
@ -735,3 +750,41 @@ func haveSame(a1, a2 []string) bool {
} }
return false return false
} }
type NodeStatus struct {
info NodeInfo
}
func GeneralPredicates(info NodeInfo) algorithm.FitPredicate {
node := &NodeStatus{
info: info,
}
return node.SchedulerGeneralPredicates
}
func (n *NodeStatus) SchedulerGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
return RunGeneralPredicates(pod, nodeName, nodeInfo, node)
}
func RunGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, node *api.Node) (bool, error) {
fit, err := podFitsResourcesInternal(pod, nodeName, nodeInfo, node)
if !fit {
return fit, err
}
fit, err = PodFitsHost(pod, nodeName, nodeInfo)
if !fit {
return fit, err
}
fit, err = PodFitsHostPorts(pod, nodeName, nodeInfo)
if !fit {
return fit, err
}
if !PodMatchesNodeLabels(pod, node) {
return false, ErrNodeSelectorNotMatch
}
return true, nil
}

View File

@ -159,7 +159,7 @@ func TestPodFitsResources(t *testing.T) {
for _, test := range enoughPodsTests { for _, test := range enoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}} node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}}
fit := ResourceFit{FakeNodeInfo(node)} fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo) fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
@ -204,7 +204,7 @@ func TestPodFitsResources(t *testing.T) {
for _, test := range notEnoughPodsTests { for _, test := range notEnoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1)}} node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1)}}
fit := ResourceFit{FakeNodeInfo(node)} fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo) fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
@ -252,11 +252,14 @@ func TestPodFitsHost(t *testing.T) {
for _, test := range tests { for _, test := range tests {
result, err := PodFitsHost(test.pod, test.node, schedulercache.NewNodeInfo()) result, err := PodFitsHost(test.pod, test.node, schedulercache.NewNodeInfo())
if err != nil { if !reflect.DeepEqual(err, ErrPodNotMatchHostName) && err != nil {
t.Errorf("unexpected error: %v", err)
}
if result == false && !reflect.DeepEqual(err, ErrPodNotMatchHostName) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if result != test.fits { if result != test.fits {
t.Errorf("unexpected difference for %s: got: %v expected %v", test.test, test.fits, result) t.Errorf("unexpected difference for %s: expected: %v got %v", test.test, test.fits, result)
} }
} }
} }
@ -322,7 +325,10 @@ func TestPodFitsHostPorts(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
fits, err := PodFitsHostPorts(test.pod, "machine", test.nodeInfo) fits, err := PodFitsHostPorts(test.pod, "machine", test.nodeInfo)
if err != nil { if !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) && err != nil {
t.Errorf("unexpected error: %v", err)
}
if fits == false && !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if test.fits != fits { if test.fits != fits {
@ -404,8 +410,11 @@ func TestDiskConflicts(t *testing.T) {
for _, test := range tests { for _, test := range tests {
ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo)
if err != nil { if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Fatalf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
}
if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) {
t.Errorf("unexpected error: %v", err)
} }
if test.isOk && !ok { if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
@ -453,8 +462,11 @@ func TestAWSDiskConflicts(t *testing.T) {
for _, test := range tests { for _, test := range tests {
ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo)
if err != nil { if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Fatalf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
}
if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) {
t.Errorf("unexpected error: %v", err)
} }
if test.isOk && !ok { if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
@ -508,8 +520,11 @@ func TestRBDDiskConflicts(t *testing.T) {
for _, test := range tests { for _, test := range tests {
ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo) ok, err := NoDiskConflict(test.pod, "machine", test.nodeInfo)
if err != nil { if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil {
t.Fatalf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
}
if ok == false && !reflect.DeepEqual(err, ErrDiskConflict) {
t.Errorf("unexpected error: %v", err)
} }
if test.isOk && !ok { if test.isOk && !ok {
t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test) t.Errorf("expected ok, got none. %v %s %s", test.pod, test.nodeInfo, test.test)
@ -980,9 +995,12 @@ func TestPodFitsSelector(t *testing.T) {
for _, test := range tests { for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}} node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}}
fit := NodeSelector{FakeNodeInfo(node)} fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodSelectorMatches(test.pod, "machine", schedulercache.NewNodeInfo()) fits, err := fit.PodSelectorMatches(test.pod, "machine", schedulercache.NewNodeInfo())
if err != nil { if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil {
t.Errorf("unexpected error: %v", err)
}
if fits == false && !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if fits != test.fits { if fits != test.fits {
@ -1041,7 +1059,10 @@ func TestNodeLabelPresence(t *testing.T) {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}} node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}}
labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence} labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", schedulercache.NewNodeInfo()) fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", schedulercache.NewNodeInfo())
if err != nil { if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil {
t.Errorf("unexpected error: %v", err)
}
if fits == false && !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if fits != test.fits { if fits != test.fits {
@ -1181,7 +1202,10 @@ func TestServiceAffinity(t *testing.T) {
nodes := []api.Node{node1, node2, node3, node4, node5} nodes := []api.Node{node1, node2, node3, node4, node5}
serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels}
fits, err := serviceAffinity.CheckServiceAffinity(test.pod, test.node, schedulercache.NewNodeInfo()) fits, err := serviceAffinity.CheckServiceAffinity(test.pod, test.node, schedulercache.NewNodeInfo())
if err != nil { if !reflect.DeepEqual(err, ErrServiceAffinityViolated) && err != nil {
t.Errorf("unexpected error: %v", err)
}
if fits == false && !reflect.DeepEqual(err, ErrServiceAffinityViolated) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if fits != test.fits { if fits != test.fits {
@ -1401,7 +1425,7 @@ func TestEBSVolumeCountConflicts(t *testing.T) {
for _, test := range tests { for _, test := range tests {
pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo) pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo)
fits, err := pred(test.newPod, "some-node", schedulercache.NewNodeInfo(test.existingPods...)) fits, err := pred(test.newPod, "some-node", schedulercache.NewNodeInfo(test.existingPods...))
if err != nil { if err != nil && !reflect.DeepEqual(err, ErrMaxVolumeCountExceeded) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1455,3 +1479,83 @@ func TestPredicatesRegistered(t *testing.T) {
} }
} }
} }
func newPodWithPort(hostPorts ...int) *api.Pod {
networkPorts := []api.ContainerPort{}
for _, port := range hostPorts {
networkPorts = append(networkPorts, api.ContainerPort{HostPort: port})
}
return &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Ports: networkPorts,
},
},
},
}
}
func TestRunGeneralPredicates(t *testing.T) {
resourceTests := []struct {
pod *api.Pod
nodeName string
nodeInfo *schedulercache.NodeInfo
node *api.Node
fits bool
test string
wErr error
}{
{
pod: &api.Pod{},
nodeName: "machine1",
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 9, memory: 19})),
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}},
fits: true,
wErr: nil,
test: "no resources/port/host requested always fits",
},
{
pod: newResourcePod(resourceRequest{milliCPU: 8, memory: 10}),
nodeName: "machine1",
nodeInfo: schedulercache.NewNodeInfo(
newResourcePod(resourceRequest{milliCPU: 5, memory: 19})),
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}},
fits: false,
wErr: newInsufficientResourceError("CPU", 8, 5, 10),
test: "not enough cpu resource",
},
{
pod: &api.Pod{
Spec: api.PodSpec{
NodeName: "machine2",
},
},
nodeName: "machine1",
nodeInfo: schedulercache.NewNodeInfo(),
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}},
fits: false,
wErr: ErrPodNotMatchHostName,
test: "host not match",
},
{
pod: newPodWithPort(123),
nodeName: "machine1",
nodeInfo: schedulercache.NewNodeInfo(newPodWithPort(123)),
node: &api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}},
fits: false,
wErr: ErrPodNotFitsHostPorts,
test: "hostport conflict",
},
}
for _, test := range resourceTests {
fits, err := RunGeneralPredicates(test.pod, test.nodeName, test.nodeInfo, test.node)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
}
}
}

View File

@ -23,6 +23,7 @@ import (
) )
// FitPredicate is a function that indicates if a pod fits into an existing node. // FitPredicate is a function that indicates if a pod fits into an existing node.
// The failure information is given by the error.
type FitPredicate func(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) type FitPredicate func(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error)
type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister NodeLister) (schedulerapi.HostPriorityList, error) type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister NodeLister) (schedulerapi.HostPriorityList, error)

View File

@ -77,19 +77,27 @@ func init() {
// of already-installed packages required by the pod will be preferred over nodes with no already-installed // of already-installed packages required by the pod will be preferred over nodes with no already-installed
// packages required by the pod or a small total size of already-installed packages required by the pod. // packages required by the pod or a small total size of already-installed packages required by the pod.
factory.RegisterPriorityFunction("ImageLocalityPriority", priorities.ImageLocalityPriority, 1) factory.RegisterPriorityFunction("ImageLocalityPriority", priorities.ImageLocalityPriority, 1)
// Fit is defined based on the absence of port conflicts.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("PodFitsHostPorts", predicates.PodFitsHostPorts)
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicateFactory(
"PodFitsResources",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewResourceFitPredicate(args.NodeInfo)
},
)
// Fit is determined by the presence of the Host parameter and a string match
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("HostName", predicates.PodFitsHost)
} }
func defaultPredicates() sets.String { func defaultPredicates() sets.String {
return sets.NewString( return sets.NewString(
// Fit is defined based on the absence of port conflicts.
factory.RegisterFitPredicate("PodFitsHostPorts", predicates.PodFitsHostPorts),
// Fit is determined by resource availability.
factory.RegisterFitPredicateFactory(
"PodFitsResources",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewResourceFitPredicate(args.NodeInfo)
},
),
// Fit is determined by non-conflicting disk volumes. // Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict), factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict),
// Fit is determined by volume zone requirements. // Fit is determined by volume zone requirements.
@ -106,9 +114,6 @@ func defaultPredicates() sets.String {
return predicates.NewSelectorMatchPredicate(args.NodeInfo) return predicates.NewSelectorMatchPredicate(args.NodeInfo)
}, },
), ),
// Fit is determined by the presence of the Host parameter and a string match
factory.RegisterFitPredicate("HostName", predicates.PodFitsHost),
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node // Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
"MaxEBSVolumeCount", "MaxEBSVolumeCount",
@ -118,7 +123,6 @@ func defaultPredicates() sets.String {
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilter, maxVols, args.PVInfo, args.PVCInfo) return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilter, maxVols, args.PVInfo, args.PVCInfo)
}, },
), ),
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node // Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory( factory.RegisterFitPredicateFactory(
"MaxGCEPDVolumeCount", "MaxGCEPDVolumeCount",
@ -128,6 +132,14 @@ func defaultPredicates() sets.String {
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilter, maxVols, args.PVInfo, args.PVCInfo) return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilter, maxVols, args.PVInfo, args.PVCInfo)
}, },
), ),
// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicateFactory(
"GeneralPredicates",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.GeneralPredicates(args.NodeInfo)
},
),
) )
} }

View File

@ -84,6 +84,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
} }
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders) filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -130,7 +131,7 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
for _, node := range nodes.Items { for _, node := range nodes.Items {
fits := true fits := true
for name, predicate := range predicateFuncs { for _, predicate := range predicateFuncs {
fit, err := predicate(pod, node.Name, nodeNameToInfo[node.Name]) fit, err := predicate(pod, node.Name, nodeNameToInfo[node.Name])
if err != nil { if err != nil {
switch e := err.(type) { switch e := err.(type) {
@ -139,6 +140,11 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e) err := fmt.Errorf("got InsufficientResourceError: %v, but also fit='true' which is unexpected", e)
return api.NodeList{}, FailedPredicateMap{}, err return api.NodeList{}, FailedPredicateMap{}, err
} }
case *predicates.PredicateFailureError:
if fit {
err := fmt.Errorf("got PredicateFailureError: %v, but also fit='true' which is unexpected", e)
return api.NodeList{}, FailedPredicateMap{}, err
}
default: default:
return api.NodeList{}, FailedPredicateMap{}, err return api.NodeList{}, FailedPredicateMap{}, err
} }
@ -149,10 +155,16 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
failedPredicateMap[node.Name] = sets.String{} failedPredicateMap[node.Name] = sets.String{}
} }
if re, ok := err.(*predicates.InsufficientResourceError); ok { if re, ok := err.(*predicates.InsufficientResourceError); ok {
failedPredicateMap[node.Name].Insert(re.Error()) failedPredicateMap[node.Name].Insert(fmt.Sprintf("Insufficient %s", re.ResourceName))
break break
} }
failedPredicateMap[node.Name].Insert(name) if re, ok := err.(*predicates.PredicateFailureError); ok {
failedPredicateMap[node.Name].Insert(re.PredicateName)
break
} else {
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
return api.NodeList{}, FailedPredicateMap{}, err
}
break break
} }
} }

View File

@ -20,19 +20,21 @@ import (
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
"reflect"
"strconv" "strconv"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
algorithmpredicates "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
) )
func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return false, nil return false, algorithmpredicates.ErrFakePredicateError
} }
func truePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { func truePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
@ -40,11 +42,17 @@ func truePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeI
} }
func matchesPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { func matchesPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return pod.Name == nodeName, nil if pod.Name == nodeName {
return true, nil
}
return false, algorithmpredicates.ErrFakePredicateError
} }
func hasNoPodsPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) { func hasNoPodsPredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return len(nodeInfo.Pods()) == 0, nil if len(nodeInfo.Pods()) == 0 {
return true, nil
}
return false, algorithmpredicates.ErrFakePredicateError
} }
func numericPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { func numericPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
@ -176,6 +184,7 @@ func TestGenericScheduler(t *testing.T) {
pods []*api.Pod pods []*api.Pod
expectedHosts sets.String expectedHosts sets.String
expectsErr bool expectsErr bool
wErr error
}{ }{
{ {
predicates: map[string]algorithm.FitPredicate{"false": falsePredicate}, predicates: map[string]algorithm.FitPredicate{"false": falsePredicate},
@ -183,6 +192,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
expectsErr: true, expectsErr: true,
name: "test 1", name: "test 1",
wErr: algorithmpredicates.ErrFakePredicateError,
}, },
{ {
predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
@ -190,6 +200,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"machine1", "machine2"}, nodes: []string{"machine1", "machine2"},
expectedHosts: sets.NewString("machine1", "machine2"), expectedHosts: sets.NewString("machine1", "machine2"),
name: "test 2", name: "test 2",
wErr: nil,
}, },
{ {
// Fits on a machine where the pod ID matches the machine name // Fits on a machine where the pod ID matches the machine name
@ -199,6 +210,7 @@ func TestGenericScheduler(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}}, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}},
expectedHosts: sets.NewString("machine2"), expectedHosts: sets.NewString("machine2"),
name: "test 3", name: "test 3",
wErr: nil,
}, },
{ {
predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
@ -206,6 +218,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
expectedHosts: sets.NewString("3"), expectedHosts: sets.NewString("3"),
name: "test 4", name: "test 4",
wErr: nil,
}, },
{ {
predicates: map[string]algorithm.FitPredicate{"matches": matchesPredicate}, predicates: map[string]algorithm.FitPredicate{"matches": matchesPredicate},
@ -214,6 +227,7 @@ func TestGenericScheduler(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
expectedHosts: sets.NewString("2"), expectedHosts: sets.NewString("2"),
name: "test 5", name: "test 5",
wErr: nil,
}, },
{ {
predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
@ -222,6 +236,7 @@ func TestGenericScheduler(t *testing.T) {
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}}, pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
expectedHosts: sets.NewString("1"), expectedHosts: sets.NewString("1"),
name: "test 6", name: "test 6",
wErr: nil,
}, },
{ {
predicates: map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate}, predicates: map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate},
@ -229,6 +244,7 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
expectsErr: true, expectsErr: true,
name: "test 7", name: "test 7",
wErr: nil,
}, },
{ {
predicates: map[string]algorithm.FitPredicate{ predicates: map[string]algorithm.FitPredicate{
@ -252,20 +268,20 @@ func TestGenericScheduler(t *testing.T) {
nodes: []string{"1", "2"}, nodes: []string{"1", "2"},
expectsErr: true, expectsErr: true,
name: "test 8", name: "test 8",
wErr: nil,
}, },
} }
for _, test := range tests { for _, test := range tests {
random := rand.New(rand.NewSource(0)) random := rand.New(rand.NewSource(0))
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random) scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr { if test.expectsErr {
if err == nil { if err == nil {
t.Error("Unexpected non-error") t.Errorf("Unexpected non-error at %s", test.name)
} }
} else { } else {
if err != nil { if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v", err) t.Errorf("Failed : %s, Unexpected error: %v, expected: %v", test.name, err, test.wErr)
} }
if !test.expectedHosts.Has(machine) { if !test.expectedHosts.Has(machine) {
t.Errorf("Failed : %s, Expected: %s, Saw: %s", test.name, test.expectedHosts, machine) t.Errorf("Failed : %s, Expected: %s, Saw: %s", test.name, test.expectedHosts, machine)
@ -312,9 +328,9 @@ func TestFindFitSomeError(t *testing.T) {
"2": schedulercache.NewNodeInfo(), "2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(pod), "1": schedulercache.NewNodeInfo(pod),
} }
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, predicates, makeNodeList(nodes), nil)
if err != nil { _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, predicates, makeNodeList(nodes), nil)
if err != nil && !reflect.DeepEqual(err, algorithmpredicates.ErrFakePredicateError) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -330,7 +346,7 @@ func TestFindFitSomeError(t *testing.T) {
if !found { if !found {
t.Errorf("failed to find node: %s in %v", node, predicateMap) t.Errorf("failed to find node: %s in %v", node, predicateMap)
} }
if len(failures) != 1 || !failures.Has("match") { if len(failures) != 1 || !failures.Has("false") {
t.Errorf("unexpected failures: %v", failures) t.Errorf("unexpected failures: %v", failures)
} }
} }