mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Refactor HostIP predicate algorithm
- Remove string decode logic. It's not really helping to find the conflict ports, and it's expensive to do encoding/decoding - Not to parse the container ports information in predicate meta, use straight []*v1.ContainerPort - Use better data structure to search port conflict based on ip addresses - Collect scattered source code into common place
This commit is contained in:
parent
8a9954d471
commit
68c2c79362
@ -57,7 +57,6 @@ go_test(
|
||||
"//pkg/scheduler/algorithm:go_default_library",
|
||||
"//pkg/scheduler/schedulercache:go_default_library",
|
||||
"//pkg/scheduler/testing:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/storage/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
|
@ -46,7 +46,7 @@ type predicateMetadata struct {
|
||||
pod *v1.Pod
|
||||
podBestEffort bool
|
||||
podRequest *schedulercache.Resource
|
||||
podPorts map[string]bool
|
||||
podPorts []*v1.ContainerPort
|
||||
//key is a pod full name with the anti-affinity rules.
|
||||
matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm
|
||||
serviceAffinityInUse bool
|
||||
@ -90,7 +90,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
|
||||
pod: pod,
|
||||
podBestEffort: isPodBestEffort(pod),
|
||||
podRequest: GetResourceRequest(pod),
|
||||
podPorts: schedutil.GetUsedPorts(pod),
|
||||
podPorts: schedutil.GetContainerPorts(pod),
|
||||
matchingAntiAffinityTerms: matchingTerms,
|
||||
}
|
||||
for predicateName, precomputeFunc := range predicateMetadataProducers {
|
||||
@ -172,10 +172,7 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
|
||||
podRequest: meta.podRequest,
|
||||
serviceAffinityInUse: meta.serviceAffinityInUse,
|
||||
}
|
||||
newPredMeta.podPorts = map[string]bool{}
|
||||
for k, v := range meta.podPorts {
|
||||
newPredMeta.podPorts[k] = v
|
||||
}
|
||||
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
|
||||
newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{}
|
||||
for k, v := range meta.matchingAntiAffinityTerms {
|
||||
newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...)
|
||||
|
@ -373,7 +373,15 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
|
||||
Memory: 300,
|
||||
AllowedPodNumber: 4,
|
||||
},
|
||||
podPorts: map[string]bool{"1234": true, "456": false},
|
||||
podPorts: []*v1.ContainerPort{
|
||||
{
|
||||
Name: "name",
|
||||
HostPort: 10,
|
||||
ContainerPort: 20,
|
||||
Protocol: "TCP",
|
||||
HostIP: "1.2.3.4",
|
||||
},
|
||||
},
|
||||
matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{
|
||||
"term1": {
|
||||
{
|
||||
|
@ -966,12 +966,12 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta algorithm.Predi
|
||||
|
||||
// PodFitsHostPorts checks if a node has free ports for the requested pod ports.
|
||||
func PodFitsHostPorts(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var wantPorts map[string]bool
|
||||
var wantPorts []*v1.ContainerPort
|
||||
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||
wantPorts = predicateMeta.podPorts
|
||||
} else {
|
||||
// We couldn't parse metadata - fallback to computing it.
|
||||
wantPorts = schedutil.GetUsedPorts(pod)
|
||||
wantPorts = schedutil.GetContainerPorts(pod)
|
||||
}
|
||||
if len(wantPorts) == 0 {
|
||||
return true, nil, nil
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
@ -32,7 +33,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
||||
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -518,13 +518,13 @@ func TestPodFitsHost(t *testing.T) {
|
||||
func newPod(host string, hostPortInfos ...string) *v1.Pod {
|
||||
networkPorts := []v1.ContainerPort{}
|
||||
for _, portInfo := range hostPortInfos {
|
||||
hostPortInfo := decode(portInfo)
|
||||
hostPort, _ := strconv.Atoi(hostPortInfo.hostPort)
|
||||
splited := strings.Split(portInfo, "/")
|
||||
hostPort, _ := strconv.Atoi(splited[2])
|
||||
|
||||
networkPorts = append(networkPorts, v1.ContainerPort{
|
||||
HostIP: hostPortInfo.hostIP,
|
||||
HostIP: splited[1],
|
||||
HostPort: int32(hostPort),
|
||||
Protocol: v1.Protocol(hostPortInfo.protocol),
|
||||
Protocol: v1.Protocol(splited[0]),
|
||||
})
|
||||
}
|
||||
return &v1.Pod{
|
||||
@ -653,41 +653,6 @@ func TestPodFitsHostPorts(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetUsedPorts(t *testing.T) {
|
||||
tests := []struct {
|
||||
pods []*v1.Pod
|
||||
ports map[string]bool
|
||||
}{
|
||||
{
|
||||
[]*v1.Pod{
|
||||
newPod("m1", "UDP/127.0.0.1/9090"),
|
||||
},
|
||||
map[string]bool{"UDP/127.0.0.1/9090": true},
|
||||
},
|
||||
{
|
||||
[]*v1.Pod{
|
||||
newPod("m1", "UDP/127.0.0.1/9090"),
|
||||
newPod("m1", "UDP/127.0.0.1/9091"),
|
||||
},
|
||||
map[string]bool{"UDP/127.0.0.1/9090": true, "UDP/127.0.0.1/9091": true},
|
||||
},
|
||||
{
|
||||
[]*v1.Pod{
|
||||
newPod("m1", "TCP/0.0.0.0/9090"),
|
||||
newPod("m2", "UDP/127.0.0.1/9091"),
|
||||
},
|
||||
map[string]bool{"TCP/0.0.0.0/9090": true, "UDP/127.0.0.1/9091": true},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
ports := schedutil.GetUsedPorts(test.pods...)
|
||||
if !reflect.DeepEqual(test.ports, ports) {
|
||||
t.Errorf("%s: expected %v, got %v", "test get used ports", test.ports, ports)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGCEDiskConflicts(t *testing.T) {
|
||||
volState := v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
|
@ -17,10 +17,7 @@ limitations under the License.
|
||||
package predicates
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@ -135,65 +132,11 @@ type EquivalencePod struct {
|
||||
PVCSet sets.String
|
||||
}
|
||||
|
||||
type hostPortInfo struct {
|
||||
protocol string
|
||||
hostIP string
|
||||
hostPort string
|
||||
}
|
||||
|
||||
// decode decodes string ("protocol/hostIP/hostPort") to *hostPortInfo object.
|
||||
func decode(info string) *hostPortInfo {
|
||||
hostPortInfoSlice := strings.Split(info, "/")
|
||||
|
||||
protocol := hostPortInfoSlice[0]
|
||||
hostIP := hostPortInfoSlice[1]
|
||||
hostPort := hostPortInfoSlice[2]
|
||||
|
||||
return &hostPortInfo{
|
||||
protocol: protocol,
|
||||
hostIP: hostIP,
|
||||
hostPort: hostPort,
|
||||
}
|
||||
}
|
||||
|
||||
// specialPortConflictCheck detects whether specailHostPort(whose hostIP is 0.0.0.0) is conflict with otherHostPorts.
|
||||
// return true if we have a conflict.
|
||||
func specialPortConflictCheck(specialHostPort string, otherHostPorts map[string]bool) bool {
|
||||
specialHostPortInfo := decode(specialHostPort)
|
||||
|
||||
if specialHostPortInfo.hostIP == schedutil.DefaultBindAllHostIP {
|
||||
// loop through all the otherHostPorts to see if there exists a conflict
|
||||
for hostPortItem := range otherHostPorts {
|
||||
hostPortInfo := decode(hostPortItem)
|
||||
|
||||
// if there exists one hostPortItem which has the same hostPort and protocol with the specialHostPort, that will cause a conflict
|
||||
if specialHostPortInfo.hostPort == hostPortInfo.hostPort && specialHostPortInfo.protocol == hostPortInfo.protocol {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// portsConflict check whether existingPorts and wantPorts conflict with each other
|
||||
// return true if we have a conflict
|
||||
func portsConflict(existingPorts, wantPorts map[string]bool) bool {
|
||||
|
||||
for existingPort := range existingPorts {
|
||||
if specialPortConflictCheck(existingPort, wantPorts) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
for wantPort := range wantPorts {
|
||||
if specialPortConflictCheck(wantPort, existingPorts) {
|
||||
return true
|
||||
}
|
||||
|
||||
// general check hostPort conflict procedure for hostIP is not 0.0.0.0
|
||||
if existingPorts[wantPort] {
|
||||
func portsConflict(existingPorts schedutil.HostPortInfo, wantPorts []*v1.ContainerPort) bool {
|
||||
for _, cp := range wantPorts {
|
||||
if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -18,8 +18,6 @@ package predicates
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -70,194 +68,3 @@ func ExampleFindLabelsInSet() {
|
||||
// label1=value1,label2=value2,label3=will_see_this
|
||||
// pod1,pod2,
|
||||
}
|
||||
|
||||
func Test_decode(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
args string
|
||||
want *hostPortInfo
|
||||
}{
|
||||
{
|
||||
name: "test1",
|
||||
args: "UDP/127.0.0.1/80",
|
||||
want: &hostPortInfo{
|
||||
protocol: "UDP",
|
||||
hostIP: "127.0.0.1",
|
||||
hostPort: "80",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test2",
|
||||
args: "TCP/127.0.0.1/80",
|
||||
want: &hostPortInfo{
|
||||
protocol: "TCP",
|
||||
hostIP: "127.0.0.1",
|
||||
hostPort: "80",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test3",
|
||||
args: "TCP/0.0.0.0/80",
|
||||
want: &hostPortInfo{
|
||||
protocol: "TCP",
|
||||
hostIP: "0.0.0.0",
|
||||
hostPort: "80",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
if got := decode(tt.args); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("test name = %v, decode() = %v, want %v", tt.name, got, tt.want)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func Test_specialPortConflictCheck(t *testing.T) {
|
||||
type args struct {
|
||||
specialHostPort string
|
||||
otherHostPorts map[string]bool
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "test-1",
|
||||
args: args{
|
||||
specialHostPort: "TCP/0.0.0.0/80",
|
||||
otherHostPorts: map[string]bool{
|
||||
"TCP/127.0.0.2/8080": true,
|
||||
"TCP/127.0.0.1/80": true,
|
||||
"UDP/127.0.0.2/8080": true,
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "test-2",
|
||||
args: args{
|
||||
specialHostPort: "TCP/0.0.0.0/80",
|
||||
otherHostPorts: map[string]bool{
|
||||
"TCP/127.0.0.2/8080": true,
|
||||
"UDP/127.0.0.1/80": true,
|
||||
"UDP/127.0.0.2/8080": true,
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "test-3",
|
||||
args: args{
|
||||
specialHostPort: "TCP/0.0.0.0/80",
|
||||
otherHostPorts: map[string]bool{
|
||||
"TCP/127.0.0.2/8080": true,
|
||||
"TCP/127.0.0.1/8090": true,
|
||||
"UDP/127.0.0.2/8080": true,
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "test-4",
|
||||
args: args{
|
||||
specialHostPort: "TCP/0.0.0.0/80",
|
||||
otherHostPorts: map[string]bool{
|
||||
"UDP/127.0.0.2/8080": true,
|
||||
"UDP/127.0.0.1/8090": true,
|
||||
"TCP/127.0.0.2/8080": true,
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := specialPortConflictCheck(tt.args.specialHostPort, tt.args.otherHostPorts); got != tt.want {
|
||||
t.Errorf("specialPortConflictCheck() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_portsConflict(t *testing.T) {
|
||||
type args struct {
|
||||
existingPorts map[string]bool
|
||||
wantPorts map[string]bool
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "test1",
|
||||
args: args{
|
||||
existingPorts: map[string]bool{
|
||||
"UDP/127.0.0.1/8080": true,
|
||||
},
|
||||
wantPorts: map[string]bool{
|
||||
"UDP/127.0.0.1/8080": true,
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "test2",
|
||||
args: args{
|
||||
existingPorts: map[string]bool{
|
||||
"UDP/127.0.0.2/8080": true,
|
||||
},
|
||||
wantPorts: map[string]bool{
|
||||
"UDP/127.0.0.1/8080": true,
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "test3",
|
||||
args: args{
|
||||
existingPorts: map[string]bool{
|
||||
"TCP/127.0.0.1/8080": true,
|
||||
},
|
||||
wantPorts: map[string]bool{
|
||||
"UDP/127.0.0.1/8080": true,
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "test4",
|
||||
args: args{
|
||||
existingPorts: map[string]bool{
|
||||
"TCP/0.0.0.0/8080": true,
|
||||
},
|
||||
wantPorts: map[string]bool{
|
||||
"TCP/127.0.0.1/8080": true,
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "test5",
|
||||
args: args{
|
||||
existingPorts: map[string]bool{
|
||||
"TCP/127.0.0.1/8080": true,
|
||||
},
|
||||
wantPorts: map[string]bool{
|
||||
"TCP/0.0.0.0/8080": true,
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := portsConflict(tt.args.existingPorts, tt.args.wantPorts); got != tt.want {
|
||||
t.Errorf("portsConflict() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,32 @@ func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *No
|
||||
}
|
||||
}
|
||||
|
||||
type hostPortInfoParam struct {
|
||||
protocol, ip string
|
||||
port int32
|
||||
}
|
||||
|
||||
type hostPortInfoBuilder struct {
|
||||
inputs []hostPortInfoParam
|
||||
}
|
||||
|
||||
func newHostPortInfoBuilder() *hostPortInfoBuilder {
|
||||
return &hostPortInfoBuilder{}
|
||||
}
|
||||
|
||||
func (b *hostPortInfoBuilder) add(protocol, ip string, port int32) *hostPortInfoBuilder {
|
||||
b.inputs = append(b.inputs, hostPortInfoParam{protocol, ip, port})
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *hostPortInfoBuilder) build() schedutil.HostPortInfo {
|
||||
res := make(schedutil.HostPortInfo)
|
||||
for _, param := range b.inputs {
|
||||
res.Add(param.ip, param.protocol, param.port)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated
|
||||
// on node level.
|
||||
func TestAssumePodScheduled(t *testing.T) {
|
||||
@ -74,7 +100,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[0]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
},
|
||||
}, {
|
||||
pods: []*v1.Pod{testPods[1], testPods[2]},
|
||||
@ -89,7 +115,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[1], testPods[2]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true, "TCP/127.0.0.1/8080": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
|
||||
},
|
||||
}, { // test non-zero request
|
||||
pods: []*v1.Pod{testPods[3]},
|
||||
@ -104,7 +130,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[3]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
},
|
||||
}, {
|
||||
pods: []*v1.Pod{testPods[4]},
|
||||
@ -120,7 +146,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[4]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
},
|
||||
}, {
|
||||
pods: []*v1.Pod{testPods[4], testPods[5]},
|
||||
@ -136,7 +162,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[4], testPods[5]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true, "TCP/127.0.0.1/8080": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
|
||||
},
|
||||
}, {
|
||||
pods: []*v1.Pod{testPods[6]},
|
||||
@ -151,7 +177,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[6]},
|
||||
usedPorts: map[string]bool{},
|
||||
usedPorts: newHostPortInfoBuilder().build(),
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -227,7 +253,7 @@ func TestExpirePod(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[1]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||
},
|
||||
}}
|
||||
|
||||
@ -276,7 +302,7 @@ func TestAddPodWillConfirm(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[0]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
},
|
||||
}}
|
||||
|
||||
@ -331,7 +357,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{updatedPod.DeepCopy()},
|
||||
usedPorts: map[string]bool{"TCP/0.0.0.0/90": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
|
||||
},
|
||||
},
|
||||
}}
|
||||
@ -383,7 +409,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{basePod},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
},
|
||||
}}
|
||||
|
||||
@ -436,7 +462,7 @@ func TestUpdatePod(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[1]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||
}, {
|
||||
requestedResource: &Resource{
|
||||
MilliCPU: 100,
|
||||
@ -448,7 +474,7 @@ func TestUpdatePod(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[0]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
}},
|
||||
}}
|
||||
|
||||
@ -503,7 +529,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[1]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||
}, {
|
||||
requestedResource: &Resource{
|
||||
MilliCPU: 100,
|
||||
@ -515,7 +541,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[0]},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
}},
|
||||
}}
|
||||
|
||||
@ -569,7 +595,7 @@ func TestRemovePod(t *testing.T) {
|
||||
},
|
||||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{basePod},
|
||||
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
},
|
||||
}}
|
||||
|
||||
@ -672,7 +698,7 @@ func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *NodeInfo {
|
||||
expected.pods = append(expected.pods, pod)
|
||||
expected.requestedResource.Add(getResourceRequest(pod))
|
||||
expected.nonzeroRequest.Add(getResourceRequest(pod))
|
||||
expected.usedPorts = schedutil.GetUsedPorts(pod)
|
||||
expected.updateUsedPorts(pod, true)
|
||||
expected.generation++
|
||||
}
|
||||
|
||||
|
@ -38,7 +38,7 @@ type NodeInfo struct {
|
||||
|
||||
pods []*v1.Pod
|
||||
podsWithAffinity []*v1.Pod
|
||||
usedPorts map[string]bool
|
||||
usedPorts util.HostPortInfo
|
||||
|
||||
// Total requested resource of all pods on this node.
|
||||
// It includes assumed pods which scheduler sends binding to apiserver but
|
||||
@ -164,7 +164,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
|
||||
nonzeroRequest: &Resource{},
|
||||
allocatableResource: &Resource{},
|
||||
generation: 0,
|
||||
usedPorts: make(map[string]bool),
|
||||
usedPorts: make(util.HostPortInfo),
|
||||
}
|
||||
for _, pod := range pods {
|
||||
ni.AddPod(pod)
|
||||
@ -188,7 +188,7 @@ func (n *NodeInfo) Pods() []*v1.Pod {
|
||||
return n.pods
|
||||
}
|
||||
|
||||
func (n *NodeInfo) UsedPorts() map[string]bool {
|
||||
func (n *NodeInfo) UsedPorts() util.HostPortInfo {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
@ -269,7 +269,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
|
||||
taintsErr: n.taintsErr,
|
||||
memoryPressureCondition: n.memoryPressureCondition,
|
||||
diskPressureCondition: n.diskPressureCondition,
|
||||
usedPorts: make(map[string]bool),
|
||||
usedPorts: make(util.HostPortInfo),
|
||||
generation: n.generation,
|
||||
}
|
||||
if len(n.pods) > 0 {
|
||||
@ -400,34 +400,15 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6
|
||||
return
|
||||
}
|
||||
|
||||
func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) {
|
||||
func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
|
||||
for j := range pod.Spec.Containers {
|
||||
container := &pod.Spec.Containers[j]
|
||||
for k := range container.Ports {
|
||||
podPort := &container.Ports[k]
|
||||
// "0" is explicitly ignored in PodFitsHostPorts,
|
||||
// which is the only function that uses this value.
|
||||
if podPort.HostPort != 0 {
|
||||
// user does not explicitly set protocol, default is tcp
|
||||
portProtocol := podPort.Protocol
|
||||
if podPort.Protocol == "" {
|
||||
portProtocol = v1.ProtocolTCP
|
||||
}
|
||||
|
||||
// user does not explicitly set hostIP, default is 0.0.0.0
|
||||
portHostIP := podPort.HostIP
|
||||
if podPort.HostIP == "" {
|
||||
portHostIP = util.DefaultBindAllHostIP
|
||||
}
|
||||
|
||||
str := fmt.Sprintf("%s/%s/%d", portProtocol, portHostIP, podPort.HostPort)
|
||||
|
||||
if used {
|
||||
n.usedPorts[str] = used
|
||||
} else {
|
||||
delete(n.usedPorts, str)
|
||||
}
|
||||
|
||||
if add {
|
||||
n.usedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
|
||||
} else {
|
||||
n.usedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
@ -28,33 +27,126 @@ import (
|
||||
|
||||
const DefaultBindAllHostIP = "0.0.0.0"
|
||||
|
||||
// GetUsedPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair
|
||||
// ProtocolPort represents a protocol port pair, e.g. tcp:80.
|
||||
type ProtocolPort struct {
|
||||
Protocol string
|
||||
Port int32
|
||||
}
|
||||
|
||||
// NewProtocolPort creates a ProtocolPort instance.
|
||||
func NewProtocolPort(protocol string, port int32) *ProtocolPort {
|
||||
pp := &ProtocolPort{
|
||||
Protocol: protocol,
|
||||
Port: port,
|
||||
}
|
||||
|
||||
if len(pp.Protocol) == 0 {
|
||||
pp.Protocol = string(v1.ProtocolTCP)
|
||||
}
|
||||
|
||||
return pp
|
||||
}
|
||||
|
||||
// HostPortInfo stores mapping from ip to a set of ProtocolPort
|
||||
type HostPortInfo map[string]map[ProtocolPort]struct{}
|
||||
|
||||
// Add adds (ip, protocol, port) to HostPortInfo
|
||||
func (h HostPortInfo) Add(ip, protocol string, port int32) {
|
||||
if port <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
h.sanitize(&ip, &protocol)
|
||||
|
||||
pp := NewProtocolPort(protocol, port)
|
||||
if _, ok := h[ip]; !ok {
|
||||
h[ip] = map[ProtocolPort]struct{}{
|
||||
*pp: {},
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
h[ip][*pp] = struct{}{}
|
||||
}
|
||||
|
||||
// Remove removes (ip, protocol, port) from HostPortInfo
|
||||
func (h HostPortInfo) Remove(ip, protocol string, port int32) {
|
||||
if port <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
h.sanitize(&ip, &protocol)
|
||||
|
||||
pp := NewProtocolPort(protocol, port)
|
||||
if m, ok := h[ip]; ok {
|
||||
delete(m, *pp)
|
||||
if len(h[ip]) == 0 {
|
||||
delete(h, ip)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Len returns the total number of (ip, protocol, port) tuple in HostPortInfo
|
||||
func (h HostPortInfo) Len() int {
|
||||
length := 0
|
||||
for _, m := range h {
|
||||
length += len(m)
|
||||
}
|
||||
return length
|
||||
}
|
||||
|
||||
// CheckConflict checks if the input (ip, protocol, port) conflicts with the existing
|
||||
// ones in HostPortInfo.
|
||||
func (h HostPortInfo) CheckConflict(ip, protocol string, port int32) bool {
|
||||
if port <= 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
h.sanitize(&ip, &protocol)
|
||||
|
||||
pp := NewProtocolPort(protocol, port)
|
||||
|
||||
// If ip is 0.0.0.0 check all IP's (protocol, port) pair
|
||||
if ip == DefaultBindAllHostIP {
|
||||
for _, m := range h {
|
||||
if _, ok := m[*pp]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// If ip isn't 0.0.0.0, only check IP and 0.0.0.0's (protocol, port) pair
|
||||
for _, key := range []string{DefaultBindAllHostIP, ip} {
|
||||
if m, ok := h[key]; ok {
|
||||
if _, ok2 := m[*pp]; ok2 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// sanitize the parameters
|
||||
func (h HostPortInfo) sanitize(ip, protocol *string) {
|
||||
if len(*ip) == 0 {
|
||||
*ip = DefaultBindAllHostIP
|
||||
}
|
||||
if len(*protocol) == 0 {
|
||||
*protocol = string(v1.ProtocolTCP)
|
||||
}
|
||||
}
|
||||
|
||||
// GetContainerPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair
|
||||
// will be in the result; but it does not resolve port conflict.
|
||||
func GetUsedPorts(pods ...*v1.Pod) map[string]bool {
|
||||
ports := make(map[string]bool)
|
||||
func GetContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort {
|
||||
var ports []*v1.ContainerPort
|
||||
for _, pod := range pods {
|
||||
for j := range pod.Spec.Containers {
|
||||
container := &pod.Spec.Containers[j]
|
||||
for k := range container.Ports {
|
||||
podPort := &container.Ports[k]
|
||||
// "0" is explicitly ignored in PodFitsHostPorts,
|
||||
// which is the only function that uses this value.
|
||||
if podPort.HostPort != 0 {
|
||||
// user does not explicitly set protocol, default is tcp
|
||||
portProtocol := podPort.Protocol
|
||||
if podPort.Protocol == "" {
|
||||
portProtocol = v1.ProtocolTCP
|
||||
}
|
||||
|
||||
// user does not explicitly set hostIP, default is 0.0.0.0
|
||||
portHostIP := podPort.HostIP
|
||||
if podPort.HostIP == "" {
|
||||
portHostIP = "0.0.0.0"
|
||||
}
|
||||
|
||||
str := fmt.Sprintf("%s/%s/%d", portProtocol, portHostIP, podPort.HostPort)
|
||||
ports[str] = true
|
||||
}
|
||||
ports = append(ports, &container.Ports[k])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -93,3 +93,213 @@ func TestSortableList(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type hostPortInfoParam struct {
|
||||
protocol, ip string
|
||||
port int32
|
||||
}
|
||||
|
||||
func TestHostPortInfo_AddRemove(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
added []hostPortInfoParam
|
||||
removed []hostPortInfoParam
|
||||
length int
|
||||
}{
|
||||
{
|
||||
desc: "normal add case",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 79},
|
||||
{"UDP", "127.0.0.1", 80},
|
||||
{"TCP", "127.0.0.1", 81},
|
||||
{"TCP", "127.0.0.1", 82},
|
||||
// this might not make sense in real case, but the struct doesn't forbid it.
|
||||
{"TCP", "0.0.0.0", 79},
|
||||
{"UDP", "0.0.0.0", 80},
|
||||
{"TCP", "0.0.0.0", 81},
|
||||
{"TCP", "0.0.0.0", 82},
|
||||
{"TCP", "0.0.0.0", 0},
|
||||
{"TCP", "0.0.0.0", -1},
|
||||
},
|
||||
length: 8,
|
||||
},
|
||||
{
|
||||
desc: "empty ip and protocol add should work",
|
||||
added: []hostPortInfoParam{
|
||||
{"", "127.0.0.1", 79},
|
||||
{"UDP", "127.0.0.1", 80},
|
||||
{"", "127.0.0.1", 81},
|
||||
{"", "127.0.0.1", 82},
|
||||
{"", "", 79},
|
||||
{"UDP", "", 80},
|
||||
{"", "", 81},
|
||||
{"", "", 82},
|
||||
{"", "", 0},
|
||||
{"", "", -1},
|
||||
},
|
||||
length: 8,
|
||||
},
|
||||
{
|
||||
desc: "normal remove case",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 79},
|
||||
{"UDP", "127.0.0.1", 80},
|
||||
{"TCP", "127.0.0.1", 81},
|
||||
{"TCP", "127.0.0.1", 82},
|
||||
{"TCP", "0.0.0.0", 79},
|
||||
{"UDP", "0.0.0.0", 80},
|
||||
{"TCP", "0.0.0.0", 81},
|
||||
{"TCP", "0.0.0.0", 82},
|
||||
},
|
||||
removed: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 79},
|
||||
{"UDP", "127.0.0.1", 80},
|
||||
{"TCP", "127.0.0.1", 81},
|
||||
{"TCP", "127.0.0.1", 82},
|
||||
{"TCP", "0.0.0.0", 79},
|
||||
{"UDP", "0.0.0.0", 80},
|
||||
{"TCP", "0.0.0.0", 81},
|
||||
{"TCP", "0.0.0.0", 82},
|
||||
},
|
||||
length: 0,
|
||||
},
|
||||
{
|
||||
desc: "empty ip and protocol remove should work",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 79},
|
||||
{"UDP", "127.0.0.1", 80},
|
||||
{"TCP", "127.0.0.1", 81},
|
||||
{"TCP", "127.0.0.1", 82},
|
||||
{"TCP", "0.0.0.0", 79},
|
||||
{"UDP", "0.0.0.0", 80},
|
||||
{"TCP", "0.0.0.0", 81},
|
||||
{"TCP", "0.0.0.0", 82},
|
||||
},
|
||||
removed: []hostPortInfoParam{
|
||||
{"", "127.0.0.1", 79},
|
||||
{"", "127.0.0.1", 81},
|
||||
{"", "127.0.0.1", 82},
|
||||
{"UDP", "127.0.0.1", 80},
|
||||
{"", "", 79},
|
||||
{"", "", 81},
|
||||
{"", "", 82},
|
||||
{"UDP", "", 80},
|
||||
},
|
||||
length: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
hp := make(HostPortInfo)
|
||||
for _, param := range test.added {
|
||||
hp.Add(param.ip, param.protocol, param.port)
|
||||
}
|
||||
for _, param := range test.removed {
|
||||
hp.Remove(param.ip, param.protocol, param.port)
|
||||
}
|
||||
if hp.Len() != test.length {
|
||||
t.Errorf("%v failed: expect length %d; got %d", test.desc, test.length, hp.Len())
|
||||
t.Error(hp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHostPortInfo_Check(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
added []hostPortInfoParam
|
||||
check hostPortInfoParam
|
||||
expect bool
|
||||
}{
|
||||
{
|
||||
desc: "empty check should check 0.0.0.0 and TCP",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 80},
|
||||
},
|
||||
check: hostPortInfoParam{"", "", 81},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
desc: "empty check should check 0.0.0.0 and TCP (conflicted)",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 80},
|
||||
},
|
||||
check: hostPortInfoParam{"", "", 80},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
desc: "empty port check should pass",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 80},
|
||||
},
|
||||
check: hostPortInfoParam{"", "", 0},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
desc: "0.0.0.0 should check all registered IPs",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 80},
|
||||
},
|
||||
check: hostPortInfoParam{"TCP", "0.0.0.0", 80},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
desc: "0.0.0.0 with different protocol should be allowed",
|
||||
added: []hostPortInfoParam{
|
||||
{"UDP", "127.0.0.1", 80},
|
||||
},
|
||||
check: hostPortInfoParam{"TCP", "0.0.0.0", 80},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
desc: "0.0.0.0 with different port should be allowed",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 79},
|
||||
{"TCP", "127.0.0.1", 81},
|
||||
{"TCP", "127.0.0.1", 82},
|
||||
},
|
||||
check: hostPortInfoParam{"TCP", "0.0.0.0", 80},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
desc: "normal ip should check all registered 0.0.0.0",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "0.0.0.0", 80},
|
||||
},
|
||||
check: hostPortInfoParam{"TCP", "127.0.0.1", 80},
|
||||
expect: true,
|
||||
},
|
||||
{
|
||||
desc: "normal ip with different port/protocol should be allowed (0.0.0.0)",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "0.0.0.0", 79},
|
||||
{"UDP", "0.0.0.0", 80},
|
||||
{"TCP", "0.0.0.0", 81},
|
||||
{"TCP", "0.0.0.0", 82},
|
||||
},
|
||||
check: hostPortInfoParam{"TCP", "127.0.0.1", 80},
|
||||
expect: false,
|
||||
},
|
||||
{
|
||||
desc: "normal ip with different port/protocol should be allowed",
|
||||
added: []hostPortInfoParam{
|
||||
{"TCP", "127.0.0.1", 79},
|
||||
{"UDP", "127.0.0.1", 80},
|
||||
{"TCP", "127.0.0.1", 81},
|
||||
{"TCP", "127.0.0.1", 82},
|
||||
},
|
||||
check: hostPortInfoParam{"TCP", "127.0.0.1", 80},
|
||||
expect: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
hp := make(HostPortInfo)
|
||||
for _, param := range test.added {
|
||||
hp.Add(param.ip, param.protocol, param.port)
|
||||
}
|
||||
if hp.CheckConflict(test.check.ip, test.check.protocol, test.check.port) != test.expect {
|
||||
t.Errorf("%v failed, expected %t; got %t", test.desc, test.expect, !test.expect)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user