diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 0e0af32c17f..d5b48c4a479 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -836,8 +836,7 @@ func PodFitsHostPorts(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.No return true, nil, nil } - // TODO: Aggregate it at the NodeInfo level. - existingPorts := GetUsedPorts(nodeInfo.Pods()...) + existingPorts := nodeInfo.UsedPorts() for wport := range wantPorts { if wport != 0 && existingPorts[wport] { return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go index 19574c80c5c..eb1f5abfd0f 100644 --- a/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -67,6 +67,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, + usedPorts: map[int]bool{80: true}, }, }, { pods: []*v1.Pod{testPods[1], testPods[2]}, @@ -81,6 +82,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1], testPods[2]}, + usedPorts: map[int]bool{80: true, 8080: true}, }, }, { // test non-zero request pods: []*v1.Pod{testPods[3]}, @@ -95,6 +97,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[3]}, + usedPorts: map[int]bool{80: true}, }, }} @@ -169,6 +172,7 @@ func TestExpirePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, + usedPorts: map[int]bool{80: false, 8080: true}, }, }} @@ -217,6 +221,7 @@ func TestAddPodWillConfirm(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, + usedPorts: map[int]bool{80: true, 8080: false}, }, }} @@ -261,6 +266,7 @@ func TestAddPodAfterExpiration(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, + usedPorts: map[int]bool{80: true}, }, }} @@ -313,6 +319,7 @@ func TestUpdatePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, + usedPorts: map[int]bool{8080: true}, }, { requestedResource: &Resource{ MilliCPU: 100, @@ -324,6 +331,7 @@ func TestUpdatePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, + usedPorts: map[int]bool{80: true}, }}, }} @@ -378,6 +386,7 @@ func TestExpireAddUpdatePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, + usedPorts: map[int]bool{8080: true}, }, { requestedResource: &Resource{ MilliCPU: 100, @@ -389,6 +398,7 @@ func TestExpireAddUpdatePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, + usedPorts: map[int]bool{80: true}, }}, }} @@ -443,6 +453,7 @@ func TestRemovePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, + usedPorts: map[int]bool{80: true}, }, }} diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index daf006bf62a..1cde91696ca 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -36,6 +36,7 @@ type NodeInfo struct { pods []*v1.Pod podsWithAffinity []*v1.Pod + usedPorts map[int]bool // Total requested resource of all pods on this node. // It includes assumed pods which scheduler sends binding to apiserver but @@ -117,6 +118,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { allocatableResource: &Resource{}, allowedPodNumber: 0, generation: 0, + usedPorts: make(map[int]bool), } for _, pod := range pods { ni.addPod(pod) @@ -140,6 +142,13 @@ func (n *NodeInfo) Pods() []*v1.Pod { return n.pods } +func (n *NodeInfo) UsedPorts() map[int]bool { + if n == nil { + return nil + } + return n.usedPorts +} + // PodsWithAffinity return all pods with (anti)affinity constraints on this node. func (n *NodeInfo) PodsWithAffinity() []*v1.Pod { if n == nil { @@ -215,6 +224,12 @@ func (n *NodeInfo) Clone() *NodeInfo { if len(n.pods) > 0 { clone.pods = append([]*v1.Pod(nil), n.pods...) } + if len(n.usedPorts) > 0 { + clone.usedPorts = make(map[int]bool) + for k, v := range n.usedPorts { + clone.usedPorts[k] = v + } + } if len(n.podsWithAffinity) > 0 { clone.podsWithAffinity = append([]*v1.Pod(nil), n.podsWithAffinity...) } @@ -230,7 +245,7 @@ func (n *NodeInfo) String() string { for i, pod := range n.pods { podKeys[i] = pod.Name } - return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v}", podKeys, n.requestedResource, n.nonzeroRequest) + return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v}", podKeys, n.requestedResource, n.nonzeroRequest, n.usedPorts) } func hasPodAffinityConstraints(pod *v1.Pod) bool { @@ -256,6 +271,10 @@ func (n *NodeInfo) addPod(pod *v1.Pod) { if hasPodAffinityConstraints(pod) { n.podsWithAffinity = append(n.podsWithAffinity, pod) } + + // Consume ports when pods added. + n.updateUsedPorts(pod, true) + n.generation++ } @@ -303,7 +322,12 @@ func (n *NodeInfo) removePod(pod *v1.Pod) error { } n.nonzeroRequest.MilliCPU -= non0_cpu n.nonzeroRequest.Memory -= non0_mem + + // Release ports when remove Pods. + n.updateUsedPorts(pod, false) + n.generation++ + return nil } } @@ -335,6 +359,20 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6 return } +func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) { + for j := range pod.Spec.Containers { + container := &pod.Spec.Containers[j] + for k := range container.Ports { + podPort := &container.Ports[k] + // "0" is explicitly ignored in PodFitsHostPorts, + // which is the only function that uses this value. + if podPort.HostPort != 0 { + n.usedPorts[int(podPort.HostPort)] = used + } + } + } +} + // Sets the overall node information. func (n *NodeInfo) SetNode(node *v1.Node) error { n.node = node