Merge pull request #42524 from k82cn/used_ports_per_node

Automatic merge from submit-queue (batch tested with PRs 41775, 39678, 42629, 42524, 43028)

Aggregated used ports at the NodeInfo level.

fixes #42523

```release-note
Aggregated used ports at the NodeInfo level for `PodFitsHostPorts` predicate.
```
This commit is contained in:
Kubernetes Submit Queue 2017-04-07 17:44:19 -07:00 committed by GitHub
commit fed535e199
3 changed files with 51 additions and 3 deletions

View File

@ -836,8 +836,7 @@ func PodFitsHostPorts(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.No
return true, nil, nil return true, nil, nil
} }
// TODO: Aggregate it at the NodeInfo level. existingPorts := nodeInfo.UsedPorts()
existingPorts := GetUsedPorts(nodeInfo.Pods()...)
for wport := range wantPorts { for wport := range wantPorts {
if wport != 0 && existingPorts[wport] { if wport != 0 && existingPorts[wport] {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil

View File

@ -67,6 +67,7 @@ func TestAssumePodScheduled(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]}, pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
}, },
}, { }, {
pods: []*v1.Pod{testPods[1], testPods[2]}, pods: []*v1.Pod{testPods[1], testPods[2]},
@ -81,6 +82,7 @@ func TestAssumePodScheduled(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1], testPods[2]}, pods: []*v1.Pod{testPods[1], testPods[2]},
usedPorts: map[int]bool{80: true, 8080: true},
}, },
}, { // test non-zero request }, { // test non-zero request
pods: []*v1.Pod{testPods[3]}, pods: []*v1.Pod{testPods[3]},
@ -95,6 +97,7 @@ func TestAssumePodScheduled(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[3]}, pods: []*v1.Pod{testPods[3]},
usedPorts: map[int]bool{80: true},
}, },
}} }}
@ -169,6 +172,7 @@ func TestExpirePod(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]}, pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{80: false, 8080: true},
}, },
}} }}
@ -217,6 +221,7 @@ func TestAddPodWillConfirm(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]}, pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true, 8080: false},
}, },
}} }}
@ -261,6 +266,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{basePod}, pods: []*v1.Pod{basePod},
usedPorts: map[int]bool{80: true},
}, },
}} }}
@ -313,6 +319,7 @@ func TestUpdatePod(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]}, pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{8080: true},
}, { }, {
requestedResource: &Resource{ requestedResource: &Resource{
MilliCPU: 100, MilliCPU: 100,
@ -324,6 +331,7 @@ func TestUpdatePod(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]}, pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
}}, }},
}} }}
@ -378,6 +386,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]}, pods: []*v1.Pod{testPods[1]},
usedPorts: map[int]bool{8080: true},
}, { }, {
requestedResource: &Resource{ requestedResource: &Resource{
MilliCPU: 100, MilliCPU: 100,
@ -389,6 +398,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]}, pods: []*v1.Pod{testPods[0]},
usedPorts: map[int]bool{80: true},
}}, }},
}} }}
@ -443,6 +453,7 @@ func TestRemovePod(t *testing.T) {
}, },
allocatableResource: &Resource{}, allocatableResource: &Resource{},
pods: []*v1.Pod{basePod}, pods: []*v1.Pod{basePod},
usedPorts: map[int]bool{80: true},
}, },
}} }}

View File

@ -36,6 +36,7 @@ type NodeInfo struct {
pods []*v1.Pod pods []*v1.Pod
podsWithAffinity []*v1.Pod podsWithAffinity []*v1.Pod
usedPorts map[int]bool
// Total requested resource of all pods on this node. // Total requested resource of all pods on this node.
// It includes assumed pods which scheduler sends binding to apiserver but // It includes assumed pods which scheduler sends binding to apiserver but
@ -117,6 +118,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
allocatableResource: &Resource{}, allocatableResource: &Resource{},
allowedPodNumber: 0, allowedPodNumber: 0,
generation: 0, generation: 0,
usedPorts: make(map[int]bool),
} }
for _, pod := range pods { for _, pod := range pods {
ni.addPod(pod) ni.addPod(pod)
@ -140,6 +142,13 @@ func (n *NodeInfo) Pods() []*v1.Pod {
return n.pods return n.pods
} }
func (n *NodeInfo) UsedPorts() map[int]bool {
if n == nil {
return nil
}
return n.usedPorts
}
// PodsWithAffinity return all pods with (anti)affinity constraints on this node. // PodsWithAffinity return all pods with (anti)affinity constraints on this node.
func (n *NodeInfo) PodsWithAffinity() []*v1.Pod { func (n *NodeInfo) PodsWithAffinity() []*v1.Pod {
if n == nil { if n == nil {
@ -215,6 +224,12 @@ func (n *NodeInfo) Clone() *NodeInfo {
if len(n.pods) > 0 { if len(n.pods) > 0 {
clone.pods = append([]*v1.Pod(nil), n.pods...) clone.pods = append([]*v1.Pod(nil), n.pods...)
} }
if len(n.usedPorts) > 0 {
clone.usedPorts = make(map[int]bool)
for k, v := range n.usedPorts {
clone.usedPorts[k] = v
}
}
if len(n.podsWithAffinity) > 0 { if len(n.podsWithAffinity) > 0 {
clone.podsWithAffinity = append([]*v1.Pod(nil), n.podsWithAffinity...) clone.podsWithAffinity = append([]*v1.Pod(nil), n.podsWithAffinity...)
} }
@ -230,7 +245,7 @@ func (n *NodeInfo) String() string {
for i, pod := range n.pods { for i, pod := range n.pods {
podKeys[i] = pod.Name podKeys[i] = pod.Name
} }
return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v}", podKeys, n.requestedResource, n.nonzeroRequest) return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v}", podKeys, n.requestedResource, n.nonzeroRequest, n.usedPorts)
} }
func hasPodAffinityConstraints(pod *v1.Pod) bool { func hasPodAffinityConstraints(pod *v1.Pod) bool {
@ -256,6 +271,10 @@ func (n *NodeInfo) addPod(pod *v1.Pod) {
if hasPodAffinityConstraints(pod) { if hasPodAffinityConstraints(pod) {
n.podsWithAffinity = append(n.podsWithAffinity, pod) n.podsWithAffinity = append(n.podsWithAffinity, pod)
} }
// Consume ports when pods added.
n.updateUsedPorts(pod, true)
n.generation++ n.generation++
} }
@ -303,7 +322,12 @@ func (n *NodeInfo) removePod(pod *v1.Pod) error {
} }
n.nonzeroRequest.MilliCPU -= non0_cpu n.nonzeroRequest.MilliCPU -= non0_cpu
n.nonzeroRequest.Memory -= non0_mem n.nonzeroRequest.Memory -= non0_mem
// Release ports when remove Pods.
n.updateUsedPorts(pod, false)
n.generation++ n.generation++
return nil return nil
} }
} }
@ -335,6 +359,20 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6
return return
} }
func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) {
for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j]
for k := range container.Ports {
podPort := &container.Ports[k]
// "0" is explicitly ignored in PodFitsHostPorts,
// which is the only function that uses this value.
if podPort.HostPort != 0 {
n.usedPorts[int(podPort.HostPort)] = used
}
}
}
}
// Sets the overall node information. // Sets the overall node information.
func (n *NodeInfo) SetNode(node *v1.Node) error { func (n *NodeInfo) SetNode(node *v1.Node) error {
n.node = node n.node = node