Add preemption victim selector logic to scheduler

This commit is contained in:
Bobby (Babak) Salamat 2017-08-02 18:20:45 -07:00
parent ad6c85ca2e
commit a4edc6c871
8 changed files with 564 additions and 10 deletions

View File

@ -781,6 +781,7 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no
s.serviceAffinityMetadataProducer(pm)
pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices
}
filteredPods := nodeInfo.FilterOutPods(pods)
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
@ -790,8 +791,8 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no
// Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
if len(s.labels) > len(affinityLabels) {
if len(services) > 0 {
if len(pods) > 0 {
nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(pods[0].Spec.NodeName)
if len(filteredPods) > 0 {
nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(filteredPods[0].Spec.NodeName)
if err != nil {
return false, nil, err
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
"github.com/golang/glog"
)
@ -159,6 +160,26 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
return priorityList[ix].Host, nil
}
// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns the node name if such a node is found.
// TODO(bsalamat): This function is under construction! DO NOT USE!
func (g *genericScheduler) preempt(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
nodes, err := nodeLister.List()
if err != nil {
return "", err
}
if len(nodes) == 0 {
return "", ErrNoNodesAvailable
}
nodeToPods := selectNodesForPreemption(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.predicateMetaProducer)
if len(nodeToPods) == 0 {
return "", nil
}
// TODO: Add a node scoring mechanism and perform preemption
return "", nil
}
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func findNodesThatFit(
@ -423,6 +444,125 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
}, nil
}
// selectNodesForPreemption finds all the nodes with possible victims for
// preemption in parallel.
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
nodes []*v1.Node,
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.MetadataProducer,
) map[string][]*v1.Pod {
nodeNameToPods := map[string][]*v1.Pod{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := nodes[i].Name
pods, fits := selectVictimsOnNode(pod, meta.ShallowCopy(), nodeNameToInfo[nodeName], predicates)
if fits && len(pods) != 0 {
resultLock.Lock()
nodeNameToPods[nodeName] = pods
resultLock.Unlock()
}
}
workqueue.Parallelize(16, len(nodes), checkNode)
return nodeNameToPods
}
// selectVictimsOnNode finds minimum set of pods on the given node that should
// be preempted in order to make enough room for "pod" to be scheduled. The
// minimum set selected is subject to the constraint that a higher-priority pod
// is never preempted when a lower-priority pod could be (higher/lower relative
// to one another, not relative to the preemptor "pod").
// The algorithm first checks if the pod can be scheduled on the node when all the
// lower priority pods are gone. If so, it sorts all the lower priority pods by
// their priority and starting from the highest priority one, tries to keep as
// many of them as possible while checking that the "pod" can still fit on the node.
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
// these predicates can be satisfied by removing more pods from the node.
func selectVictimsOnNode(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo, fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) {
higherPriority := func(pod1, pod2 interface{}) bool {
return util.GetPodPriority(pod1.(*v1.Pod)) > util.GetPodPriority(pod2.(*v1.Pod))
}
potentialVictims := util.SortableList{CompFunc: higherPriority}
nodeInfoCopy := nodeInfo.Clone()
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
meta.RemovePod(rp)
}
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
meta.AddPod(ap, nodeInfoCopy)
}
// As the first step, remove all the lower priority pods from the node and
// check if the given pod can be scheduled.
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if util.GetPodPriority(p) < podPriority {
potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
}
}
potentialVictims.Sort()
// If the new pod does not fit after removing all the lower priority pods,
// we are almost done and this node is not suitable for preemption. The only condition
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
if fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
return nil, false
}
// If the new pod still cannot be scheduled for any reason other than pod
// affinity, the new pod will not fit on this node and we are done here.
affinity := pod.Spec.Affinity
if affinity == nil || affinity.PodAffinity == nil {
return nil, false
}
for _, failedPred := range failedPredicates {
if failedPred != predicates.ErrPodAffinityNotMatch {
return nil, false
}
}
// If we reach here, it means that the pod cannot be scheduled due to pod
// affinity or anti-affinity. Since failure reason for both affinity and
// anti-affinity is the same, we cannot say which one caused it. So, we try
// adding pods one at a time and see if any of them satisfies the affinity rules.
for i, p := range potentialVictims.Items {
existingPod := p.(*v1.Pod)
addPod(existingPod)
if fits, _, _ = podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
removePod(existingPod)
} else {
// We found the pod needed to satisfy pod affinity. Let's remove it from
// potential victims list.
// NOTE: We assume that pod affinity can be satisfied by only one pod,
// not multiple pods. This is how scheduler works today.
potentialVictims.Items = append(potentialVictims.Items[:i], potentialVictims.Items[i+1:]...)
break
}
}
if !fits {
return nil, false
}
}
victims := []*v1.Pod{}
// Try to reprieve as may pods as possible starting from the highest priority one.
for _, p := range potentialVictims.Items {
lpp := p.(*v1.Pod)
addPod(lpp)
if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits {
removePod(lpp)
victims = append(victims, lpp)
}
}
return victims, true
}
func NewGenericScheduler(
cache schedulercache.Cache,
eCache *EquivalenceCache,

View File

@ -392,10 +392,13 @@ func makeNode(node string, milliCPU, memory int64) *v1.Node {
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
"pods": *resource.NewQuantity(100, resource.DecimalSI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
"pods": *resource.NewQuantity(100, resource.DecimalSI),
},
},
}
@ -544,3 +547,245 @@ func TestZeroRequest(t *testing.T) {
}
}
}
func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[string][]*v1.Pod) error {
if len(expected) == len(nodeToPods) {
for k, pods := range nodeToPods {
if expPods, ok := expected[k]; ok {
if len(pods) != len(expPods) {
return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, nodeToPods)
}
prevPriority := int32(math.MaxInt32)
for _, p := range pods {
// Check that pods are sorted by their priority.
if *p.Spec.Priority > prevPriority {
return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k)
}
prevPriority = *p.Spec.Priority
if _, ok := expPods[p.Name]; !ok {
return fmt.Errorf("test [%v]: pod %v was not expected. Expected: %v", testName, p.Name, expPods)
}
}
} else {
return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, nodeToPods)
}
}
} else {
return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, nodeToPods)
}
return nil
}
type FakeNodeInfo v1.Node
func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) {
node := v1.Node(n)
return &node, nil
}
func PredicateMetadata(p *v1.Pod, nodeInfo map[string]*schedulercache.NodeInfo) interface{} {
return algorithmpredicates.NewPredicateMetadataFactory(schedulertesting.FakePodLister{p})(p, nodeInfo)
}
// TestSelectNodesForPreemption tests selectNodesForPreemption. This test assumes
// that podsFitsOnNode works correctly and is tested separately.
func TestSelectNodesForPreemption(t *testing.T) {
smallContainers := []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMilliCpuRequest, 10) + "m"),
"memory": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMemoryRequest, 10)),
},
},
},
}
mediumContainers := []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMilliCpuRequest*2, 10) + "m"),
"memory": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMemoryRequest*2, 10)),
},
},
},
}
largeContainers := []v1.Container{
{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"cpu": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMilliCpuRequest*3, 10) + "m"),
"memory": resource.MustParse(
strconv.FormatInt(priorityutil.DefaultMemoryRequest*3, 10)),
},
},
},
}
lowPriority, midPriority, highPriority := int32(0), int32(100), int32(1000)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
nodes []string
pod *v1.Pod
pods []*v1.Pod
expected map[string]map[string]bool // Map from node name to a list of pods names which should be preempted.
addAffinityPredicate bool
}{
{
name: "a pod that does not fit on any machine",
predicates: map[string]algorithm.FitPredicate{"matches": falsePredicate},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "new"}, Spec: v1.PodSpec{Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{},
},
{
name: "a pod that fits with no preemption",
predicates: map[string]algorithm.FitPredicate{"matches": truePredicate},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "new"}, Spec: v1.PodSpec{Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{},
},
{
name: "a pod that fits on one machine with no preemption",
predicates: map[string]algorithm.FitPredicate{"matches": matchesPredicate},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{},
},
{
name: "a pod that fits on both machines when lower priority pods are preempted",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {"b": true}},
},
{
name: "a pod that would fit on the machines, but other pods running are higher priority",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &lowPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{},
},
{
name: "medium priority pod is preempted, but lower priority one stays as it is small",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"b": true}, "machine2": {"c": true}},
},
{
name: "mixed priority pods are preempted",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}},
},
{
name: "lower priority pod is not preempted to satisfy pod affinity",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
TopologyKey: "hostname",
},
},
}}}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}},
addAffinityPredicate: true,
},
{
name: "between two pods that satisfy affinity, the higher priority one stays",
predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
nodes: []string{"machine1", "machine2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
TopologyKey: "hostname",
},
},
}}}},
pods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "b", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}},
expected: map[string]map[string]bool{"machine1": {"a": true, "c": true}},
addAffinityPredicate: true,
},
}
for _, test := range tests {
nodes := []*v1.Node{}
for _, n := range test.nodes {
node := makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5)
node.ObjectMeta.Labels = map[string]string{"hostname": node.Name}
nodes = append(nodes, node)
}
if test.addAffinityPredicate {
test.predicates["affinity"] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata)
if err := checkPreemptionVictims(test.name, test.expected, nodeToPods); err != nil {
t.Error(err)
}
}
}

View File

@ -193,7 +193,7 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) {
n = NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.addPod(pod)
n.AddPod(pod)
}
// Assumes that lock is already acquired.
@ -208,7 +208,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
// Assumes that lock is already acquired.
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n := cache.nodes[pod.Spec.NodeName]
if err := n.removePod(pod); err != nil {
if err := n.RemovePod(pod); err != nil {
return err
}
if len(n.pods) == 0 && n.node == nil {

View File

@ -162,7 +162,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
usedPorts: make(map[int]bool),
}
for _, pod := range pods {
ni.addPod(pod)
ni.AddPod(pod)
}
return ni
}
@ -294,8 +294,8 @@ func hasPodAffinityConstraints(pod *v1.Pod) bool {
return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
}
// addPod adds pod information to this NodeInfo.
func (n *NodeInfo) addPod(pod *v1.Pod) {
// AddPod adds pod information to this NodeInfo.
func (n *NodeInfo) AddPod(pod *v1.Pod) {
res, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += res.MilliCPU
n.requestedResource.Memory += res.Memory
@ -320,8 +320,8 @@ func (n *NodeInfo) addPod(pod *v1.Pod) {
n.generation++
}
// removePod subtracts pod information to this NodeInfo.
func (n *NodeInfo) removePod(pod *v1.Pod) error {
// RemovePod subtracts pod information to this NodeInfo.
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
k1, err := getPodKey(pod)
if err != nil {
return err
@ -441,6 +441,37 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
return nil
}
// FilterOutPods receives a list of pods and filters out those whose node names
// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo.
//
// Preemption logic simulates removal of pods on a node by removing them from the
// corresponding NodeInfo. In order for the simulation to work, we call this method
// on the pods returned from SchedulerCache, so that predicate functions see
// only the pods that are not removed from the NodeInfo.
func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod {
node := n.Node()
if node == nil {
return pods
}
filtered := make([]*v1.Pod, 0, len(pods))
for _, p := range pods {
if p.Spec.NodeName == node.Name {
// If pod is on the given node, add it to 'filtered' only if it is present in nodeInfo.
podKey, _ := getPodKey(p)
for _, np := range n.Pods() {
npodkey, _ := getPodKey(np)
if npodkey == podKey {
filtered = append(filtered, p)
break
}
}
} else {
filtered = append(filtered, p)
}
}
return filtered
}
// getPodKey returns the string key of a pod.
func getPodKey(pod *v1.Pod) (string, error) {
return clientcache.MetaNamespaceKeyFunc(pod)

View File

@ -27,7 +27,7 @@ func CreateNodeNameToInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*NodeI
if _, ok := nodeNameToInfo[nodeName]; !ok {
nodeNameToInfo[nodeName] = NewNodeInfo()
}
nodeNameToInfo[nodeName].addPod(pod)
nodeNameToInfo[nodeName].AddPod(pod)
}
for _, node := range nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {

View File

@ -17,7 +17,10 @@ limitations under the License.
package util
import (
"sort"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/scheduling"
)
// GetUsedPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair
@ -46,3 +49,42 @@ func GetPodFullName(pod *v1.Pod) string {
// (DNS subdomain format).
return pod.Name + "_" + pod.Namespace
}
// GetPodPriority return priority of the given pod.
func GetPodPriority(pod *v1.Pod) int32 {
if pod.Spec.Priority != nil {
return *pod.Spec.Priority
}
// When priority of a running pod is nil, it means it was created at a time
// that there was no global default priority class and the priority class
// name of the pod was empty. So, we resolve to the static default priority.
return scheduling.DefaultPriorityWhenNoDefaultClassExists
}
// SortableList is a list that implements sort.Interface.
type SortableList struct {
Items []interface{}
CompFunc LessFunc
}
// LessFunc is a function that receives two Pods and returns true if the first
// pod should be placed before pod2 when the list is sorted.
type LessFunc func(item1, item2 interface{}) bool
var _ = sort.Interface(&SortableList{})
func (l *SortableList) Len() int { return len(l.Items) }
func (l *SortableList) Less(i, j int) bool {
return l.CompFunc(l.Items[i], l.Items[j])
}
func (l *SortableList) Swap(i, j int) {
l.Items[i], l.Items[j] = l.Items[j], l.Items[i]
}
// Sort sorts the items in the list using the given CompFunc. Item1 is placed
// before Item2 when CompFunc(Item1, Item2) returns true.
func (l *SortableList) Sort() {
sort.Sort(l)
}

View File

@ -0,0 +1,95 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/apis/scheduling"
)
// TestGetPodPriority tests GetPodPriority function.
func TestGetPodPriority(t *testing.T) {
p := int32(20)
tests := []struct {
name string
pod *v1.Pod
expectedPriority int32
}{
{
name: "no priority pod resolves to static default priority",
pod: &v1.Pod{
Spec: v1.PodSpec{Containers: []v1.Container{
{Name: "container", Image: "image"}},
},
},
expectedPriority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
},
{
name: "pod with priority resolves correctly",
pod: &v1.Pod{
Spec: v1.PodSpec{Containers: []v1.Container{
{Name: "container", Image: "image"}},
Priority: &p,
},
},
expectedPriority: p,
},
}
for _, test := range tests {
if GetPodPriority(test.pod) != test.expectedPriority {
t.Errorf("expected pod priority: %v, got %v", test.expectedPriority, GetPodPriority(test.pod))
}
}
}
// TestSortableList tests SortableList by storing pods in the list and sorting
// them by their priority.
func TestSortableList(t *testing.T) {
higherPriority := func(pod1, pod2 interface{}) bool {
return GetPodPriority(pod1.(*v1.Pod)) > GetPodPriority(pod2.(*v1.Pod))
}
podList := SortableList{CompFunc: higherPriority}
// Add a few Pods with different priorities from lowest to highest priority.
for i := 0; i < 10; i++ {
var p int32 = int32(i)
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
},
},
Priority: &p,
},
}
podList.Items = append(podList.Items, pod)
}
podList.Sort()
if len(podList.Items) != 10 {
t.Errorf("expected length of list was 10, got: %v", len(podList.Items))
}
var prevPriority = int32(10)
for _, p := range podList.Items {
if *p.(*v1.Pod).Spec.Priority >= prevPriority {
t.Errorf("Pods are not soreted. Current pod pririty is %v, while previous one was %v.", *p.(*v1.Pod).Spec.Priority, prevPriority)
}
}
}