diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index dc811b8e80f..c9f55181412 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -781,6 +781,7 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no s.serviceAffinityMetadataProducer(pm) pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices } + filteredPods := nodeInfo.FilterOutPods(pods) node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") @@ -790,8 +791,8 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta interface{}, no // Step 1: If we don't have all constraints, introspect nodes to find the missing constraints. if len(s.labels) > len(affinityLabels) { if len(services) > 0 { - if len(pods) > 0 { - nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(pods[0].Spec.NodeName) + if len(filteredPods) > 0 { + nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(filteredPods[0].Spec.NodeName) if err != nil { return false, nil, err } diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index a9244f6636e..bb635739ad4 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + "k8s.io/kubernetes/plugin/pkg/scheduler/util" "github.com/golang/glog" ) @@ -159,6 +160,26 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList return priorityList[ix].Host, nil } +// preempt finds nodes with pods that can be preempted to make room for "pod" to +// schedule. It chooses one of the nodes and preempts the pods on the node and +// returns the node name if such a node is found. +// TODO(bsalamat): This function is under construction! DO NOT USE! +func (g *genericScheduler) preempt(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) { + nodes, err := nodeLister.List() + if err != nil { + return "", err + } + if len(nodes) == 0 { + return "", ErrNoNodesAvailable + } + nodeToPods := selectNodesForPreemption(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.predicateMetaProducer) + if len(nodeToPods) == 0 { + return "", nil + } + // TODO: Add a node scoring mechanism and perform preemption + return "", nil +} + // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit func findNodesThatFit( @@ -423,6 +444,125 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf }, nil } +// selectNodesForPreemption finds all the nodes with possible victims for +// preemption in parallel. +func selectNodesForPreemption(pod *v1.Pod, + nodeNameToInfo map[string]*schedulercache.NodeInfo, + nodes []*v1.Node, + predicates map[string]algorithm.FitPredicate, + metadataProducer algorithm.MetadataProducer, +) map[string][]*v1.Pod { + + nodeNameToPods := map[string][]*v1.Pod{} + var resultLock sync.Mutex + + // We can use the same metadata producer for all nodes. + meta := metadataProducer(pod, nodeNameToInfo) + checkNode := func(i int) { + nodeName := nodes[i].Name + pods, fits := selectVictimsOnNode(pod, meta.ShallowCopy(), nodeNameToInfo[nodeName], predicates) + if fits && len(pods) != 0 { + resultLock.Lock() + nodeNameToPods[nodeName] = pods + resultLock.Unlock() + } + } + workqueue.Parallelize(16, len(nodes), checkNode) + return nodeNameToPods +} + +// selectVictimsOnNode finds minimum set of pods on the given node that should +// be preempted in order to make enough room for "pod" to be scheduled. The +// minimum set selected is subject to the constraint that a higher-priority pod +// is never preempted when a lower-priority pod could be (higher/lower relative +// to one another, not relative to the preemptor "pod"). +// The algorithm first checks if the pod can be scheduled on the node when all the +// lower priority pods are gone. If so, it sorts all the lower priority pods by +// their priority and starting from the highest priority one, tries to keep as +// many of them as possible while checking that the "pod" can still fit on the node. +// NOTE: This function assumes that it is never called if "pod" cannot be scheduled +// due to pod affinity, node affinity, or node anti-affinity reasons. None of +// these predicates can be satisfied by removing more pods from the node. +func selectVictimsOnNode(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo, fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) { + higherPriority := func(pod1, pod2 interface{}) bool { + return util.GetPodPriority(pod1.(*v1.Pod)) > util.GetPodPriority(pod2.(*v1.Pod)) + } + potentialVictims := util.SortableList{CompFunc: higherPriority} + nodeInfoCopy := nodeInfo.Clone() + + removePod := func(rp *v1.Pod) { + nodeInfoCopy.RemovePod(rp) + meta.RemovePod(rp) + } + addPod := func(ap *v1.Pod) { + nodeInfoCopy.AddPod(ap) + meta.AddPod(ap, nodeInfoCopy) + } + // As the first step, remove all the lower priority pods from the node and + // check if the given pod can be scheduled. + podPriority := util.GetPodPriority(pod) + for _, p := range nodeInfoCopy.Pods() { + if util.GetPodPriority(p) < podPriority { + potentialVictims.Items = append(potentialVictims.Items, p) + removePod(p) + } + } + potentialVictims.Sort() + // If the new pod does not fit after removing all the lower priority pods, + // we are almost done and this node is not suitable for preemption. The only condition + // that we should check is if the "pod" is failing to schedule due to pod affinity + // failure. + if fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { + if err != nil { + glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) + return nil, false + } + // If the new pod still cannot be scheduled for any reason other than pod + // affinity, the new pod will not fit on this node and we are done here. + affinity := pod.Spec.Affinity + if affinity == nil || affinity.PodAffinity == nil { + return nil, false + } + for _, failedPred := range failedPredicates { + if failedPred != predicates.ErrPodAffinityNotMatch { + return nil, false + } + } + // If we reach here, it means that the pod cannot be scheduled due to pod + // affinity or anti-affinity. Since failure reason for both affinity and + // anti-affinity is the same, we cannot say which one caused it. So, we try + // adding pods one at a time and see if any of them satisfies the affinity rules. + for i, p := range potentialVictims.Items { + existingPod := p.(*v1.Pod) + addPod(existingPod) + if fits, _, _ = podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { + removePod(existingPod) + } else { + // We found the pod needed to satisfy pod affinity. Let's remove it from + // potential victims list. + // NOTE: We assume that pod affinity can be satisfied by only one pod, + // not multiple pods. This is how scheduler works today. + potentialVictims.Items = append(potentialVictims.Items[:i], potentialVictims.Items[i+1:]...) + break + } + } + if !fits { + return nil, false + } + } + victims := []*v1.Pod{} + // Try to reprieve as may pods as possible starting from the highest priority one. + for _, p := range potentialVictims.Items { + lpp := p.(*v1.Pod) + addPod(lpp) + if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { + removePod(lpp) + victims = append(victims, lpp) + } + } + return victims, true +} + func NewGenericScheduler( cache schedulercache.Cache, eCache *EquivalenceCache, diff --git a/plugin/pkg/scheduler/core/generic_scheduler_test.go b/plugin/pkg/scheduler/core/generic_scheduler_test.go index f4abcaa5057..bbd95f85cb9 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/core/generic_scheduler_test.go @@ -392,10 +392,13 @@ func makeNode(node string, milliCPU, memory int64) *v1.Node { Capacity: v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), + "pods": *resource.NewQuantity(100, resource.DecimalSI), }, Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), + "pods": *resource.NewQuantity(100, resource.DecimalSI), }, }, } @@ -544,3 +547,245 @@ func TestZeroRequest(t *testing.T) { } } } + +func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[string][]*v1.Pod) error { + if len(expected) == len(nodeToPods) { + for k, pods := range nodeToPods { + if expPods, ok := expected[k]; ok { + if len(pods) != len(expPods) { + return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, nodeToPods) + } + prevPriority := int32(math.MaxInt32) + for _, p := range pods { + // Check that pods are sorted by their priority. + if *p.Spec.Priority > prevPriority { + return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k) + } + prevPriority = *p.Spec.Priority + if _, ok := expPods[p.Name]; !ok { + return fmt.Errorf("test [%v]: pod %v was not expected. Expected: %v", testName, p.Name, expPods) + } + } + } else { + return fmt.Errorf("test [%v]: unexpected machines. expected: %v, got: %v", testName, expected, nodeToPods) + } + } + } else { + return fmt.Errorf("test [%v]: unexpected number of machines. expected: %v, got: %v", testName, expected, nodeToPods) + } + return nil +} + +type FakeNodeInfo v1.Node + +func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*v1.Node, error) { + node := v1.Node(n) + return &node, nil +} + +func PredicateMetadata(p *v1.Pod, nodeInfo map[string]*schedulercache.NodeInfo) interface{} { + return algorithmpredicates.NewPredicateMetadataFactory(schedulertesting.FakePodLister{p})(p, nodeInfo) +} + +// TestSelectNodesForPreemption tests selectNodesForPreemption. This test assumes +// that podsFitsOnNode works correctly and is tested separately. +func TestSelectNodesForPreemption(t *testing.T) { + smallContainers := []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "cpu": resource.MustParse( + strconv.FormatInt(priorityutil.DefaultMilliCpuRequest, 10) + "m"), + "memory": resource.MustParse( + strconv.FormatInt(priorityutil.DefaultMemoryRequest, 10)), + }, + }, + }, + } + mediumContainers := []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "cpu": resource.MustParse( + strconv.FormatInt(priorityutil.DefaultMilliCpuRequest*2, 10) + "m"), + "memory": resource.MustParse( + strconv.FormatInt(priorityutil.DefaultMemoryRequest*2, 10)), + }, + }, + }, + } + largeContainers := []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "cpu": resource.MustParse( + strconv.FormatInt(priorityutil.DefaultMilliCpuRequest*3, 10) + "m"), + "memory": resource.MustParse( + strconv.FormatInt(priorityutil.DefaultMemoryRequest*3, 10)), + }, + }, + }, + } + lowPriority, midPriority, highPriority := int32(0), int32(100), int32(1000) + tests := []struct { + name string + predicates map[string]algorithm.FitPredicate + nodes []string + pod *v1.Pod + pods []*v1.Pod + expected map[string]map[string]bool // Map from node name to a list of pods names which should be preempted. + addAffinityPredicate bool + }{ + { + name: "a pod that does not fit on any machine", + predicates: map[string]algorithm.FitPredicate{"matches": falsePredicate}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "new"}, Spec: v1.PodSpec{Priority: &highPriority}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{}, + }, + { + name: "a pod that fits with no preemption", + predicates: map[string]algorithm.FitPredicate{"matches": truePredicate}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "new"}, Spec: v1.PodSpec{Priority: &highPriority}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{}, + }, + { + name: "a pod that fits on one machine with no preemption", + predicates: map[string]algorithm.FitPredicate{"matches": matchesPredicate}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Priority: &highPriority}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Priority: &midPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{}, + }, + { + name: "a pod that fits on both machines when lower priority pods are preempted", + predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{"machine1": {"a": true}, "machine2": {"b": true}}, + }, + { + name: "a pod that would fit on the machines, but other pods running are higher priority", + predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &lowPriority}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{}, + }, + { + name: "medium priority pod is preempted, but lower priority one stays as it is small", + predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &midPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{"machine1": {"b": true}, "machine2": {"c": true}}, + }, + { + name: "mixed priority pods are preempted", + predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}}, + }, + { + name: "lower priority pod is not preempted to satisfy pod affinity", + predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, Affinity: &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + TopologyKey: "hostname", + }, + }, + }}}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{"machine1": {"b": true, "c": true}}, + addAffinityPredicate: true, + }, + { + name: "between two pods that satisfy affinity, the higher priority one stays", + predicates: map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, + nodes: []string{"machine1", "machine2"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, Affinity: &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "service", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"securityscan", "value2"}, + }, + }, + }, + TopologyKey: "hostname", + }, + }, + }}}}, + pods: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "a", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &lowPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "b", Labels: map[string]string{"service": "securityscan"}}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "c"}, Spec: v1.PodSpec{Containers: mediumContainers, Priority: &midPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "d"}, Spec: v1.PodSpec{Containers: smallContainers, Priority: &highPriority, NodeName: "machine1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "e"}, Spec: v1.PodSpec{Containers: largeContainers, Priority: &highPriority, NodeName: "machine2"}}}, + expected: map[string]map[string]bool{"machine1": {"a": true, "c": true}}, + addAffinityPredicate: true, + }, + } + + for _, test := range tests { + nodes := []*v1.Node{} + for _, n := range test.nodes { + node := makeNode(n, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5) + node.ObjectMeta.Labels = map[string]string{"hostname": node.Name} + nodes = append(nodes, node) + } + if test.addAffinityPredicate { + test.predicates["affinity"] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods)) + } + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes) + nodeToPods := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata) + if err := checkPreemptionVictims(test.name, test.expected, nodeToPods); err != nil { + t.Error(err) + } + } +} diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index be436f7b7e2..0f562e25925 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -193,7 +193,7 @@ func (cache *schedulerCache) addPod(pod *v1.Pod) { n = NewNodeInfo() cache.nodes[pod.Spec.NodeName] = n } - n.addPod(pod) + n.AddPod(pod) } // Assumes that lock is already acquired. @@ -208,7 +208,7 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { // Assumes that lock is already acquired. func (cache *schedulerCache) removePod(pod *v1.Pod) error { n := cache.nodes[pod.Spec.NodeName] - if err := n.removePod(pod); err != nil { + if err := n.RemovePod(pod); err != nil { return err } if len(n.pods) == 0 && n.node == nil { diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index dd3f8206b09..4192aea7264 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -162,7 +162,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { usedPorts: make(map[int]bool), } for _, pod := range pods { - ni.addPod(pod) + ni.AddPod(pod) } return ni } @@ -294,8 +294,8 @@ func hasPodAffinityConstraints(pod *v1.Pod) bool { return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) } -// addPod adds pod information to this NodeInfo. -func (n *NodeInfo) addPod(pod *v1.Pod) { +// AddPod adds pod information to this NodeInfo. +func (n *NodeInfo) AddPod(pod *v1.Pod) { res, non0_cpu, non0_mem := calculateResource(pod) n.requestedResource.MilliCPU += res.MilliCPU n.requestedResource.Memory += res.Memory @@ -320,8 +320,8 @@ func (n *NodeInfo) addPod(pod *v1.Pod) { n.generation++ } -// removePod subtracts pod information to this NodeInfo. -func (n *NodeInfo) removePod(pod *v1.Pod) error { +// RemovePod subtracts pod information to this NodeInfo. +func (n *NodeInfo) RemovePod(pod *v1.Pod) error { k1, err := getPodKey(pod) if err != nil { return err @@ -441,6 +441,37 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error { return nil } +// FilterOutPods receives a list of pods and filters out those whose node names +// are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. +// +// Preemption logic simulates removal of pods on a node by removing them from the +// corresponding NodeInfo. In order for the simulation to work, we call this method +// on the pods returned from SchedulerCache, so that predicate functions see +// only the pods that are not removed from the NodeInfo. +func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod { + node := n.Node() + if node == nil { + return pods + } + filtered := make([]*v1.Pod, 0, len(pods)) + for _, p := range pods { + if p.Spec.NodeName == node.Name { + // If pod is on the given node, add it to 'filtered' only if it is present in nodeInfo. + podKey, _ := getPodKey(p) + for _, np := range n.Pods() { + npodkey, _ := getPodKey(np) + if npodkey == podKey { + filtered = append(filtered, p) + break + } + } + } else { + filtered = append(filtered, p) + } + } + return filtered +} + // getPodKey returns the string key of a pod. func getPodKey(pod *v1.Pod) (string, error) { return clientcache.MetaNamespaceKeyFunc(pod) diff --git a/plugin/pkg/scheduler/schedulercache/util.go b/plugin/pkg/scheduler/schedulercache/util.go index dc3c28a6e30..e2ac2d90777 100644 --- a/plugin/pkg/scheduler/schedulercache/util.go +++ b/plugin/pkg/scheduler/schedulercache/util.go @@ -27,7 +27,7 @@ func CreateNodeNameToInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*NodeI if _, ok := nodeNameToInfo[nodeName]; !ok { nodeNameToInfo[nodeName] = NewNodeInfo() } - nodeNameToInfo[nodeName].addPod(pod) + nodeNameToInfo[nodeName].AddPod(pod) } for _, node := range nodes { if _, ok := nodeNameToInfo[node.Name]; !ok { diff --git a/plugin/pkg/scheduler/util/utils.go b/plugin/pkg/scheduler/util/utils.go index a230cf55fa6..2bf10adf6b1 100644 --- a/plugin/pkg/scheduler/util/utils.go +++ b/plugin/pkg/scheduler/util/utils.go @@ -17,7 +17,10 @@ limitations under the License. package util import ( + "sort" + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/apis/scheduling" ) // GetUsedPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair @@ -46,3 +49,42 @@ func GetPodFullName(pod *v1.Pod) string { // (DNS subdomain format). return pod.Name + "_" + pod.Namespace } + +// GetPodPriority return priority of the given pod. +func GetPodPriority(pod *v1.Pod) int32 { + if pod.Spec.Priority != nil { + return *pod.Spec.Priority + } + // When priority of a running pod is nil, it means it was created at a time + // that there was no global default priority class and the priority class + // name of the pod was empty. So, we resolve to the static default priority. + return scheduling.DefaultPriorityWhenNoDefaultClassExists +} + +// SortableList is a list that implements sort.Interface. +type SortableList struct { + Items []interface{} + CompFunc LessFunc +} + +// LessFunc is a function that receives two Pods and returns true if the first +// pod should be placed before pod2 when the list is sorted. +type LessFunc func(item1, item2 interface{}) bool + +var _ = sort.Interface(&SortableList{}) + +func (l *SortableList) Len() int { return len(l.Items) } + +func (l *SortableList) Less(i, j int) bool { + return l.CompFunc(l.Items[i], l.Items[j]) +} + +func (l *SortableList) Swap(i, j int) { + l.Items[i], l.Items[j] = l.Items[j], l.Items[i] +} + +// Sort sorts the items in the list using the given CompFunc. Item1 is placed +// before Item2 when CompFunc(Item1, Item2) returns true. +func (l *SortableList) Sort() { + sort.Sort(l) +} diff --git a/plugin/pkg/scheduler/util/utils_test.go b/plugin/pkg/scheduler/util/utils_test.go new file mode 100644 index 00000000000..653c3b9b0d6 --- /dev/null +++ b/plugin/pkg/scheduler/util/utils_test.go @@ -0,0 +1,95 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/apis/scheduling" +) + +// TestGetPodPriority tests GetPodPriority function. +func TestGetPodPriority(t *testing.T) { + p := int32(20) + tests := []struct { + name string + pod *v1.Pod + expectedPriority int32 + }{ + { + name: "no priority pod resolves to static default priority", + pod: &v1.Pod{ + Spec: v1.PodSpec{Containers: []v1.Container{ + {Name: "container", Image: "image"}}, + }, + }, + expectedPriority: scheduling.DefaultPriorityWhenNoDefaultClassExists, + }, + { + name: "pod with priority resolves correctly", + pod: &v1.Pod{ + Spec: v1.PodSpec{Containers: []v1.Container{ + {Name: "container", Image: "image"}}, + Priority: &p, + }, + }, + expectedPriority: p, + }, + } + for _, test := range tests { + if GetPodPriority(test.pod) != test.expectedPriority { + t.Errorf("expected pod priority: %v, got %v", test.expectedPriority, GetPodPriority(test.pod)) + } + + } +} + +// TestSortableList tests SortableList by storing pods in the list and sorting +// them by their priority. +func TestSortableList(t *testing.T) { + higherPriority := func(pod1, pod2 interface{}) bool { + return GetPodPriority(pod1.(*v1.Pod)) > GetPodPriority(pod2.(*v1.Pod)) + } + podList := SortableList{CompFunc: higherPriority} + // Add a few Pods with different priorities from lowest to highest priority. + for i := 0; i < 10; i++ { + var p int32 = int32(i) + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "container", + Image: "image", + }, + }, + Priority: &p, + }, + } + podList.Items = append(podList.Items, pod) + } + podList.Sort() + if len(podList.Items) != 10 { + t.Errorf("expected length of list was 10, got: %v", len(podList.Items)) + } + var prevPriority = int32(10) + for _, p := range podList.Items { + if *p.(*v1.Pod).Spec.Priority >= prevPriority { + t.Errorf("Pods are not soreted. Current pod pririty is %v, while previous one was %v.", *p.(*v1.Pod).Spec.Priority, prevPriority) + } + } +}