From 68c2c79362a506939b6513108b9b8d689cf94b0f Mon Sep 17 00:00:00 2001 From: Yongkun Anfernee Gui Date: Thu, 16 Nov 2017 15:43:06 -0800 Subject: [PATCH] Refactor HostIP predicate algorithm - Remove string decode logic. It's not really helping to find the conflict ports, and it's expensive to do encoding/decoding - Not to parse the container ports information in predicate meta, use straight []*v1.ContainerPort - Use better data structure to search port conflict based on ip addresses - Collect scattered source code into common place --- pkg/scheduler/algorithm/predicates/BUILD | 1 - .../algorithm/predicates/metadata.go | 9 +- .../algorithm/predicates/metadata_test.go | 10 +- .../algorithm/predicates/predicates.go | 4 +- .../algorithm/predicates/predicates_test.go | 45 +--- pkg/scheduler/algorithm/predicates/utils.go | 63 +----- .../algorithm/predicates/utils_test.go | 193 ---------------- pkg/scheduler/schedulercache/cache_test.go | 58 +++-- pkg/scheduler/schedulercache/node_info.go | 37 +-- pkg/scheduler/util/utils.go | 138 ++++++++++-- pkg/scheduler/util/utils_test.go | 210 ++++++++++++++++++ 11 files changed, 398 insertions(+), 370 deletions(-) diff --git a/pkg/scheduler/algorithm/predicates/BUILD b/pkg/scheduler/algorithm/predicates/BUILD index 6c091d1381d..a028dac6259 100644 --- a/pkg/scheduler/algorithm/predicates/BUILD +++ b/pkg/scheduler/algorithm/predicates/BUILD @@ -57,7 +57,6 @@ go_test( "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/schedulercache:go_default_library", "//pkg/scheduler/testing:go_default_library", - "//pkg/scheduler/util:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/storage/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index af8c32e2c4c..b8b935335f9 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -46,7 +46,7 @@ type predicateMetadata struct { pod *v1.Pod podBestEffort bool podRequest *schedulercache.Resource - podPorts map[string]bool + podPorts []*v1.ContainerPort //key is a pod full name with the anti-affinity rules. matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm serviceAffinityInUse bool @@ -90,7 +90,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf pod: pod, podBestEffort: isPodBestEffort(pod), podRequest: GetResourceRequest(pod), - podPorts: schedutil.GetUsedPorts(pod), + podPorts: schedutil.GetContainerPorts(pod), matchingAntiAffinityTerms: matchingTerms, } for predicateName, precomputeFunc := range predicateMetadataProducers { @@ -172,10 +172,7 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { podRequest: meta.podRequest, serviceAffinityInUse: meta.serviceAffinityInUse, } - newPredMeta.podPorts = map[string]bool{} - for k, v := range meta.podPorts { - newPredMeta.podPorts[k] = v - } + newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...) newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{} for k, v := range meta.matchingAntiAffinityTerms { newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...) diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index 31b88411015..026a979d980 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -373,7 +373,15 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) { Memory: 300, AllowedPodNumber: 4, }, - podPorts: map[string]bool{"1234": true, "456": false}, + podPorts: []*v1.ContainerPort{ + { + Name: "name", + HostPort: 10, + ContainerPort: 20, + Protocol: "TCP", + HostIP: "1.2.3.4", + }, + }, matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{ "term1": { { diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index 48d7b509285..bd7d349d084 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -966,12 +966,12 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta algorithm.Predi // PodFitsHostPorts checks if a node has free ports for the requested pod ports. func PodFitsHostPorts(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { - var wantPorts map[string]bool + var wantPorts []*v1.ContainerPort if predicateMeta, ok := meta.(*predicateMetadata); ok { wantPorts = predicateMeta.podPorts } else { // We couldn't parse metadata - fallback to computing it. - wantPorts = schedutil.GetUsedPorts(pod) + wantPorts = schedutil.GetContainerPorts(pod) } if len(wantPorts) == 0 { return true, nil, nil diff --git a/pkg/scheduler/algorithm/predicates/predicates_test.go b/pkg/scheduler/algorithm/predicates/predicates_test.go index 1b05d9e4fde..674240477fc 100644 --- a/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -20,6 +20,7 @@ import ( "os" "reflect" "strconv" + "strings" "testing" "k8s.io/api/core/v1" @@ -32,7 +33,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/schedulercache" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" - schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) var ( @@ -518,13 +518,13 @@ func TestPodFitsHost(t *testing.T) { func newPod(host string, hostPortInfos ...string) *v1.Pod { networkPorts := []v1.ContainerPort{} for _, portInfo := range hostPortInfos { - hostPortInfo := decode(portInfo) - hostPort, _ := strconv.Atoi(hostPortInfo.hostPort) + splited := strings.Split(portInfo, "/") + hostPort, _ := strconv.Atoi(splited[2]) networkPorts = append(networkPorts, v1.ContainerPort{ - HostIP: hostPortInfo.hostIP, + HostIP: splited[1], HostPort: int32(hostPort), - Protocol: v1.Protocol(hostPortInfo.protocol), + Protocol: v1.Protocol(splited[0]), }) } return &v1.Pod{ @@ -653,41 +653,6 @@ func TestPodFitsHostPorts(t *testing.T) { } } -func TestGetUsedPorts(t *testing.T) { - tests := []struct { - pods []*v1.Pod - ports map[string]bool - }{ - { - []*v1.Pod{ - newPod("m1", "UDP/127.0.0.1/9090"), - }, - map[string]bool{"UDP/127.0.0.1/9090": true}, - }, - { - []*v1.Pod{ - newPod("m1", "UDP/127.0.0.1/9090"), - newPod("m1", "UDP/127.0.0.1/9091"), - }, - map[string]bool{"UDP/127.0.0.1/9090": true, "UDP/127.0.0.1/9091": true}, - }, - { - []*v1.Pod{ - newPod("m1", "TCP/0.0.0.0/9090"), - newPod("m2", "UDP/127.0.0.1/9091"), - }, - map[string]bool{"TCP/0.0.0.0/9090": true, "UDP/127.0.0.1/9091": true}, - }, - } - - for _, test := range tests { - ports := schedutil.GetUsedPorts(test.pods...) - if !reflect.DeepEqual(test.ports, ports) { - t.Errorf("%s: expected %v, got %v", "test get used ports", test.ports, ports) - } - } -} - func TestGCEDiskConflicts(t *testing.T) { volState := v1.PodSpec{ Volumes: []v1.Volume{ diff --git a/pkg/scheduler/algorithm/predicates/utils.go b/pkg/scheduler/algorithm/predicates/utils.go index 9a25c85d9ac..ce3e9d5888e 100644 --- a/pkg/scheduler/algorithm/predicates/utils.go +++ b/pkg/scheduler/algorithm/predicates/utils.go @@ -17,10 +17,7 @@ limitations under the License. package predicates import ( - "strings" - "github.com/golang/glog" - "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -135,65 +132,11 @@ type EquivalencePod struct { PVCSet sets.String } -type hostPortInfo struct { - protocol string - hostIP string - hostPort string -} - -// decode decodes string ("protocol/hostIP/hostPort") to *hostPortInfo object. -func decode(info string) *hostPortInfo { - hostPortInfoSlice := strings.Split(info, "/") - - protocol := hostPortInfoSlice[0] - hostIP := hostPortInfoSlice[1] - hostPort := hostPortInfoSlice[2] - - return &hostPortInfo{ - protocol: protocol, - hostIP: hostIP, - hostPort: hostPort, - } -} - -// specialPortConflictCheck detects whether specailHostPort(whose hostIP is 0.0.0.0) is conflict with otherHostPorts. -// return true if we have a conflict. -func specialPortConflictCheck(specialHostPort string, otherHostPorts map[string]bool) bool { - specialHostPortInfo := decode(specialHostPort) - - if specialHostPortInfo.hostIP == schedutil.DefaultBindAllHostIP { - // loop through all the otherHostPorts to see if there exists a conflict - for hostPortItem := range otherHostPorts { - hostPortInfo := decode(hostPortItem) - - // if there exists one hostPortItem which has the same hostPort and protocol with the specialHostPort, that will cause a conflict - if specialHostPortInfo.hostPort == hostPortInfo.hostPort && specialHostPortInfo.protocol == hostPortInfo.protocol { - return true - } - } - - } - - return false -} - // portsConflict check whether existingPorts and wantPorts conflict with each other // return true if we have a conflict -func portsConflict(existingPorts, wantPorts map[string]bool) bool { - - for existingPort := range existingPorts { - if specialPortConflictCheck(existingPort, wantPorts) { - return true - } - } - - for wantPort := range wantPorts { - if specialPortConflictCheck(wantPort, existingPorts) { - return true - } - - // general check hostPort conflict procedure for hostIP is not 0.0.0.0 - if existingPorts[wantPort] { +func portsConflict(existingPorts schedutil.HostPortInfo, wantPorts []*v1.ContainerPort) bool { + for _, cp := range wantPorts { + if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) { return true } } diff --git a/pkg/scheduler/algorithm/predicates/utils_test.go b/pkg/scheduler/algorithm/predicates/utils_test.go index 308bd8da519..305a27d1304 100644 --- a/pkg/scheduler/algorithm/predicates/utils_test.go +++ b/pkg/scheduler/algorithm/predicates/utils_test.go @@ -18,8 +18,6 @@ package predicates import ( "fmt" - "reflect" - "testing" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -70,194 +68,3 @@ func ExampleFindLabelsInSet() { // label1=value1,label2=value2,label3=will_see_this // pod1,pod2, } - -func Test_decode(t *testing.T) { - tests := []struct { - name string - args string - want *hostPortInfo - }{ - { - name: "test1", - args: "UDP/127.0.0.1/80", - want: &hostPortInfo{ - protocol: "UDP", - hostIP: "127.0.0.1", - hostPort: "80", - }, - }, - { - name: "test2", - args: "TCP/127.0.0.1/80", - want: &hostPortInfo{ - protocol: "TCP", - hostIP: "127.0.0.1", - hostPort: "80", - }, - }, - { - name: "test3", - args: "TCP/0.0.0.0/80", - want: &hostPortInfo{ - protocol: "TCP", - hostIP: "0.0.0.0", - hostPort: "80", - }, - }, - } - - for _, tt := range tests { - if got := decode(tt.args); !reflect.DeepEqual(got, tt.want) { - t.Errorf("test name = %v, decode() = %v, want %v", tt.name, got, tt.want) - } - - } -} - -func Test_specialPortConflictCheck(t *testing.T) { - type args struct { - specialHostPort string - otherHostPorts map[string]bool - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "test-1", - args: args{ - specialHostPort: "TCP/0.0.0.0/80", - otherHostPorts: map[string]bool{ - "TCP/127.0.0.2/8080": true, - "TCP/127.0.0.1/80": true, - "UDP/127.0.0.2/8080": true, - }, - }, - want: true, - }, - { - name: "test-2", - args: args{ - specialHostPort: "TCP/0.0.0.0/80", - otherHostPorts: map[string]bool{ - "TCP/127.0.0.2/8080": true, - "UDP/127.0.0.1/80": true, - "UDP/127.0.0.2/8080": true, - }, - }, - want: false, - }, - { - name: "test-3", - args: args{ - specialHostPort: "TCP/0.0.0.0/80", - otherHostPorts: map[string]bool{ - "TCP/127.0.0.2/8080": true, - "TCP/127.0.0.1/8090": true, - "UDP/127.0.0.2/8080": true, - }, - }, - want: false, - }, - { - name: "test-4", - args: args{ - specialHostPort: "TCP/0.0.0.0/80", - otherHostPorts: map[string]bool{ - "UDP/127.0.0.2/8080": true, - "UDP/127.0.0.1/8090": true, - "TCP/127.0.0.2/8080": true, - }, - }, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := specialPortConflictCheck(tt.args.specialHostPort, tt.args.otherHostPorts); got != tt.want { - t.Errorf("specialPortConflictCheck() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_portsConflict(t *testing.T) { - type args struct { - existingPorts map[string]bool - wantPorts map[string]bool - } - tests := []struct { - name string - args args - want bool - }{ - { - name: "test1", - args: args{ - existingPorts: map[string]bool{ - "UDP/127.0.0.1/8080": true, - }, - wantPorts: map[string]bool{ - "UDP/127.0.0.1/8080": true, - }, - }, - want: true, - }, - { - name: "test2", - args: args{ - existingPorts: map[string]bool{ - "UDP/127.0.0.2/8080": true, - }, - wantPorts: map[string]bool{ - "UDP/127.0.0.1/8080": true, - }, - }, - want: false, - }, - { - name: "test3", - args: args{ - existingPorts: map[string]bool{ - "TCP/127.0.0.1/8080": true, - }, - wantPorts: map[string]bool{ - "UDP/127.0.0.1/8080": true, - }, - }, - want: false, - }, - { - name: "test4", - args: args{ - existingPorts: map[string]bool{ - "TCP/0.0.0.0/8080": true, - }, - wantPorts: map[string]bool{ - "TCP/127.0.0.1/8080": true, - }, - }, - want: true, - }, - { - name: "test5", - args: args{ - existingPorts: map[string]bool{ - "TCP/127.0.0.1/8080": true, - }, - wantPorts: map[string]bool{ - "TCP/0.0.0.0/8080": true, - }, - }, - want: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := portsConflict(tt.args.existingPorts, tt.args.wantPorts); got != tt.want { - t.Errorf("portsConflict() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/scheduler/schedulercache/cache_test.go b/pkg/scheduler/schedulercache/cache_test.go index b5e1243a474..9ed5764be45 100644 --- a/pkg/scheduler/schedulercache/cache_test.go +++ b/pkg/scheduler/schedulercache/cache_test.go @@ -43,6 +43,32 @@ func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *No } } +type hostPortInfoParam struct { + protocol, ip string + port int32 +} + +type hostPortInfoBuilder struct { + inputs []hostPortInfoParam +} + +func newHostPortInfoBuilder() *hostPortInfoBuilder { + return &hostPortInfoBuilder{} +} + +func (b *hostPortInfoBuilder) add(protocol, ip string, port int32) *hostPortInfoBuilder { + b.inputs = append(b.inputs, hostPortInfoParam{protocol, ip, port}) + return b +} + +func (b *hostPortInfoBuilder) build() schedutil.HostPortInfo { + res := make(schedutil.HostPortInfo) + for _, param := range b.inputs { + res.Add(param.ip, param.protocol, param.port) + } + return res +} + // TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated // on node level. func TestAssumePodScheduled(t *testing.T) { @@ -74,7 +100,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), }, }, { pods: []*v1.Pod{testPods[1], testPods[2]}, @@ -89,7 +115,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1], testPods[2]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true, "TCP/127.0.0.1/8080": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), }, }, { // test non-zero request pods: []*v1.Pod{testPods[3]}, @@ -104,7 +130,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[3]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), }, }, { pods: []*v1.Pod{testPods[4]}, @@ -120,7 +146,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[4]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), }, }, { pods: []*v1.Pod{testPods[4], testPods[5]}, @@ -136,7 +162,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[4], testPods[5]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true, "TCP/127.0.0.1/8080": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), }, }, { pods: []*v1.Pod{testPods[6]}, @@ -151,7 +177,7 @@ func TestAssumePodScheduled(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[6]}, - usedPorts: map[string]bool{}, + usedPorts: newHostPortInfoBuilder().build(), }, }, } @@ -227,7 +253,7 @@ func TestExpirePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), }, }} @@ -276,7 +302,7 @@ func TestAddPodWillConfirm(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), }, }} @@ -331,7 +357,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{updatedPod.DeepCopy()}, - usedPorts: map[string]bool{"TCP/0.0.0.0/90": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(), }, }, }} @@ -383,7 +409,7 @@ func TestAddPodAfterExpiration(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), }, }} @@ -436,7 +462,7 @@ func TestUpdatePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), }, { requestedResource: &Resource{ MilliCPU: 100, @@ -448,7 +474,7 @@ func TestUpdatePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), }}, }} @@ -503,7 +529,7 @@ func TestExpireAddUpdatePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), }, { requestedResource: &Resource{ MilliCPU: 100, @@ -515,7 +541,7 @@ func TestExpireAddUpdatePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), }}, }} @@ -569,7 +595,7 @@ func TestRemovePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, + usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), }, }} @@ -672,7 +698,7 @@ func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *NodeInfo { expected.pods = append(expected.pods, pod) expected.requestedResource.Add(getResourceRequest(pod)) expected.nonzeroRequest.Add(getResourceRequest(pod)) - expected.usedPorts = schedutil.GetUsedPorts(pod) + expected.updateUsedPorts(pod, true) expected.generation++ } diff --git a/pkg/scheduler/schedulercache/node_info.go b/pkg/scheduler/schedulercache/node_info.go index c59a2ebd686..2974206cc69 100644 --- a/pkg/scheduler/schedulercache/node_info.go +++ b/pkg/scheduler/schedulercache/node_info.go @@ -38,7 +38,7 @@ type NodeInfo struct { pods []*v1.Pod podsWithAffinity []*v1.Pod - usedPorts map[string]bool + usedPorts util.HostPortInfo // Total requested resource of all pods on this node. // It includes assumed pods which scheduler sends binding to apiserver but @@ -164,7 +164,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { nonzeroRequest: &Resource{}, allocatableResource: &Resource{}, generation: 0, - usedPorts: make(map[string]bool), + usedPorts: make(util.HostPortInfo), } for _, pod := range pods { ni.AddPod(pod) @@ -188,7 +188,7 @@ func (n *NodeInfo) Pods() []*v1.Pod { return n.pods } -func (n *NodeInfo) UsedPorts() map[string]bool { +func (n *NodeInfo) UsedPorts() util.HostPortInfo { if n == nil { return nil } @@ -269,7 +269,7 @@ func (n *NodeInfo) Clone() *NodeInfo { taintsErr: n.taintsErr, memoryPressureCondition: n.memoryPressureCondition, diskPressureCondition: n.diskPressureCondition, - usedPorts: make(map[string]bool), + usedPorts: make(util.HostPortInfo), generation: n.generation, } if len(n.pods) > 0 { @@ -400,34 +400,15 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6 return } -func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) { +func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) { for j := range pod.Spec.Containers { container := &pod.Spec.Containers[j] for k := range container.Ports { podPort := &container.Ports[k] - // "0" is explicitly ignored in PodFitsHostPorts, - // which is the only function that uses this value. - if podPort.HostPort != 0 { - // user does not explicitly set protocol, default is tcp - portProtocol := podPort.Protocol - if podPort.Protocol == "" { - portProtocol = v1.ProtocolTCP - } - - // user does not explicitly set hostIP, default is 0.0.0.0 - portHostIP := podPort.HostIP - if podPort.HostIP == "" { - portHostIP = util.DefaultBindAllHostIP - } - - str := fmt.Sprintf("%s/%s/%d", portProtocol, portHostIP, podPort.HostPort) - - if used { - n.usedPorts[str] = used - } else { - delete(n.usedPorts, str) - } - + if add { + n.usedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort) + } else { + n.usedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort) } } } diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index 66a004e0013..6da6d5ec976 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -17,7 +17,6 @@ limitations under the License. package util import ( - "fmt" "sort" "k8s.io/api/core/v1" @@ -28,33 +27,126 @@ import ( const DefaultBindAllHostIP = "0.0.0.0" -// GetUsedPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair +// ProtocolPort represents a protocol port pair, e.g. tcp:80. +type ProtocolPort struct { + Protocol string + Port int32 +} + +// NewProtocolPort creates a ProtocolPort instance. +func NewProtocolPort(protocol string, port int32) *ProtocolPort { + pp := &ProtocolPort{ + Protocol: protocol, + Port: port, + } + + if len(pp.Protocol) == 0 { + pp.Protocol = string(v1.ProtocolTCP) + } + + return pp +} + +// HostPortInfo stores mapping from ip to a set of ProtocolPort +type HostPortInfo map[string]map[ProtocolPort]struct{} + +// Add adds (ip, protocol, port) to HostPortInfo +func (h HostPortInfo) Add(ip, protocol string, port int32) { + if port <= 0 { + return + } + + h.sanitize(&ip, &protocol) + + pp := NewProtocolPort(protocol, port) + if _, ok := h[ip]; !ok { + h[ip] = map[ProtocolPort]struct{}{ + *pp: {}, + } + return + } + + h[ip][*pp] = struct{}{} +} + +// Remove removes (ip, protocol, port) from HostPortInfo +func (h HostPortInfo) Remove(ip, protocol string, port int32) { + if port <= 0 { + return + } + + h.sanitize(&ip, &protocol) + + pp := NewProtocolPort(protocol, port) + if m, ok := h[ip]; ok { + delete(m, *pp) + if len(h[ip]) == 0 { + delete(h, ip) + } + } +} + +// Len returns the total number of (ip, protocol, port) tuple in HostPortInfo +func (h HostPortInfo) Len() int { + length := 0 + for _, m := range h { + length += len(m) + } + return length +} + +// CheckConflict checks if the input (ip, protocol, port) conflicts with the existing +// ones in HostPortInfo. +func (h HostPortInfo) CheckConflict(ip, protocol string, port int32) bool { + if port <= 0 { + return false + } + + h.sanitize(&ip, &protocol) + + pp := NewProtocolPort(protocol, port) + + // If ip is 0.0.0.0 check all IP's (protocol, port) pair + if ip == DefaultBindAllHostIP { + for _, m := range h { + if _, ok := m[*pp]; ok { + return true + } + } + return false + } + + // If ip isn't 0.0.0.0, only check IP and 0.0.0.0's (protocol, port) pair + for _, key := range []string{DefaultBindAllHostIP, ip} { + if m, ok := h[key]; ok { + if _, ok2 := m[*pp]; ok2 { + return true + } + } + } + + return false +} + +// sanitize the parameters +func (h HostPortInfo) sanitize(ip, protocol *string) { + if len(*ip) == 0 { + *ip = DefaultBindAllHostIP + } + if len(*protocol) == 0 { + *protocol = string(v1.ProtocolTCP) + } +} + +// GetContainerPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair // will be in the result; but it does not resolve port conflict. -func GetUsedPorts(pods ...*v1.Pod) map[string]bool { - ports := make(map[string]bool) +func GetContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort { + var ports []*v1.ContainerPort for _, pod := range pods { for j := range pod.Spec.Containers { container := &pod.Spec.Containers[j] for k := range container.Ports { - podPort := &container.Ports[k] - // "0" is explicitly ignored in PodFitsHostPorts, - // which is the only function that uses this value. - if podPort.HostPort != 0 { - // user does not explicitly set protocol, default is tcp - portProtocol := podPort.Protocol - if podPort.Protocol == "" { - portProtocol = v1.ProtocolTCP - } - - // user does not explicitly set hostIP, default is 0.0.0.0 - portHostIP := podPort.HostIP - if podPort.HostIP == "" { - portHostIP = "0.0.0.0" - } - - str := fmt.Sprintf("%s/%s/%d", portProtocol, portHostIP, podPort.HostPort) - ports[str] = true - } + ports = append(ports, &container.Ports[k]) } } } diff --git a/pkg/scheduler/util/utils_test.go b/pkg/scheduler/util/utils_test.go index 653c3b9b0d6..a39651357cf 100644 --- a/pkg/scheduler/util/utils_test.go +++ b/pkg/scheduler/util/utils_test.go @@ -93,3 +93,213 @@ func TestSortableList(t *testing.T) { } } } + +type hostPortInfoParam struct { + protocol, ip string + port int32 +} + +func TestHostPortInfo_AddRemove(t *testing.T) { + tests := []struct { + desc string + added []hostPortInfoParam + removed []hostPortInfoParam + length int + }{ + { + desc: "normal add case", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 79}, + {"UDP", "127.0.0.1", 80}, + {"TCP", "127.0.0.1", 81}, + {"TCP", "127.0.0.1", 82}, + // this might not make sense in real case, but the struct doesn't forbid it. + {"TCP", "0.0.0.0", 79}, + {"UDP", "0.0.0.0", 80}, + {"TCP", "0.0.0.0", 81}, + {"TCP", "0.0.0.0", 82}, + {"TCP", "0.0.0.0", 0}, + {"TCP", "0.0.0.0", -1}, + }, + length: 8, + }, + { + desc: "empty ip and protocol add should work", + added: []hostPortInfoParam{ + {"", "127.0.0.1", 79}, + {"UDP", "127.0.0.1", 80}, + {"", "127.0.0.1", 81}, + {"", "127.0.0.1", 82}, + {"", "", 79}, + {"UDP", "", 80}, + {"", "", 81}, + {"", "", 82}, + {"", "", 0}, + {"", "", -1}, + }, + length: 8, + }, + { + desc: "normal remove case", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 79}, + {"UDP", "127.0.0.1", 80}, + {"TCP", "127.0.0.1", 81}, + {"TCP", "127.0.0.1", 82}, + {"TCP", "0.0.0.0", 79}, + {"UDP", "0.0.0.0", 80}, + {"TCP", "0.0.0.0", 81}, + {"TCP", "0.0.0.0", 82}, + }, + removed: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 79}, + {"UDP", "127.0.0.1", 80}, + {"TCP", "127.0.0.1", 81}, + {"TCP", "127.0.0.1", 82}, + {"TCP", "0.0.0.0", 79}, + {"UDP", "0.0.0.0", 80}, + {"TCP", "0.0.0.0", 81}, + {"TCP", "0.0.0.0", 82}, + }, + length: 0, + }, + { + desc: "empty ip and protocol remove should work", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 79}, + {"UDP", "127.0.0.1", 80}, + {"TCP", "127.0.0.1", 81}, + {"TCP", "127.0.0.1", 82}, + {"TCP", "0.0.0.0", 79}, + {"UDP", "0.0.0.0", 80}, + {"TCP", "0.0.0.0", 81}, + {"TCP", "0.0.0.0", 82}, + }, + removed: []hostPortInfoParam{ + {"", "127.0.0.1", 79}, + {"", "127.0.0.1", 81}, + {"", "127.0.0.1", 82}, + {"UDP", "127.0.0.1", 80}, + {"", "", 79}, + {"", "", 81}, + {"", "", 82}, + {"UDP", "", 80}, + }, + length: 0, + }, + } + + for _, test := range tests { + hp := make(HostPortInfo) + for _, param := range test.added { + hp.Add(param.ip, param.protocol, param.port) + } + for _, param := range test.removed { + hp.Remove(param.ip, param.protocol, param.port) + } + if hp.Len() != test.length { + t.Errorf("%v failed: expect length %d; got %d", test.desc, test.length, hp.Len()) + t.Error(hp) + } + } +} + +func TestHostPortInfo_Check(t *testing.T) { + tests := []struct { + desc string + added []hostPortInfoParam + check hostPortInfoParam + expect bool + }{ + { + desc: "empty check should check 0.0.0.0 and TCP", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 80}, + }, + check: hostPortInfoParam{"", "", 81}, + expect: false, + }, + { + desc: "empty check should check 0.0.0.0 and TCP (conflicted)", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 80}, + }, + check: hostPortInfoParam{"", "", 80}, + expect: true, + }, + { + desc: "empty port check should pass", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 80}, + }, + check: hostPortInfoParam{"", "", 0}, + expect: false, + }, + { + desc: "0.0.0.0 should check all registered IPs", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 80}, + }, + check: hostPortInfoParam{"TCP", "0.0.0.0", 80}, + expect: true, + }, + { + desc: "0.0.0.0 with different protocol should be allowed", + added: []hostPortInfoParam{ + {"UDP", "127.0.0.1", 80}, + }, + check: hostPortInfoParam{"TCP", "0.0.0.0", 80}, + expect: false, + }, + { + desc: "0.0.0.0 with different port should be allowed", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 79}, + {"TCP", "127.0.0.1", 81}, + {"TCP", "127.0.0.1", 82}, + }, + check: hostPortInfoParam{"TCP", "0.0.0.0", 80}, + expect: false, + }, + { + desc: "normal ip should check all registered 0.0.0.0", + added: []hostPortInfoParam{ + {"TCP", "0.0.0.0", 80}, + }, + check: hostPortInfoParam{"TCP", "127.0.0.1", 80}, + expect: true, + }, + { + desc: "normal ip with different port/protocol should be allowed (0.0.0.0)", + added: []hostPortInfoParam{ + {"TCP", "0.0.0.0", 79}, + {"UDP", "0.0.0.0", 80}, + {"TCP", "0.0.0.0", 81}, + {"TCP", "0.0.0.0", 82}, + }, + check: hostPortInfoParam{"TCP", "127.0.0.1", 80}, + expect: false, + }, + { + desc: "normal ip with different port/protocol should be allowed", + added: []hostPortInfoParam{ + {"TCP", "127.0.0.1", 79}, + {"UDP", "127.0.0.1", 80}, + {"TCP", "127.0.0.1", 81}, + {"TCP", "127.0.0.1", 82}, + }, + check: hostPortInfoParam{"TCP", "127.0.0.1", 80}, + expect: false, + }, + } + + for _, test := range tests { + hp := make(HostPortInfo) + for _, param := range test.added { + hp.Add(param.ip, param.protocol, param.port) + } + if hp.CheckConflict(test.check.ip, test.check.protocol, test.check.port) != test.expect { + t.Errorf("%v failed, expected %t; got %t", test.desc, test.expect, !test.expect) + } + } +}