From 954c97fe6dbbd786dd1e662758087dafe0103f68 Mon Sep 17 00:00:00 2001 From: chenxingyu Date: Wed, 11 Oct 2017 11:45:25 +0800 Subject: [PATCH] add e2e test on the hostport predicates --- .../algorithm/predicates/predicates.go | 39 +---- .../scheduler/algorithm/predicates/utils.go | 48 +++++- .../algorithm/predicates/utils_test.go | 148 ++++++++++++++++++ plugin/pkg/scheduler/scheduler.go | 1 - plugin/pkg/scheduler/scheduler_test.go | 8 +- .../scheduler/schedulercache/cache_test.go | 6 +- .../pkg/scheduler/schedulercache/node_info.go | 2 +- plugin/pkg/scheduler/util/utils.go | 2 + test/e2e/scheduling/predicates.go | 69 ++++++++ 9 files changed, 277 insertions(+), 46 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 9d75bf5c8f2..2291ae44876 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -903,42 +903,9 @@ func PodFitsHostPorts(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s existingPorts := nodeInfo.UsedPorts() - // try to see whether two hostPorts will conflict or not - for existingPort := range existingPorts { - existingHostPortInfo := decode(existingPort) - - if existingHostPortInfo.hostIP == "0.0.0.0" { - // loop through all the want hostPort to see if there exists a conflict - for wantPort := range wantPorts { - wantHostPortInfo := decode(wantPort) - - // if there already exists one hostPort whose hostIP is 0.0.0.0, then the other want hostport (which has the same protocol and port) will not fit - if wantHostPortInfo.hostPort == existingHostPortInfo.hostPort && wantHostPortInfo.protocol == existingHostPortInfo.protocol { - return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil - } - } - } - } - - for wantPort := range wantPorts { - wantHostPortInfo := decode(wantPort) - - if wantHostPortInfo.hostIP == "0.0.0.0" { - // loop through all the existing hostPort to see if there exists a conflict - for existingPort := range existingPorts { - existingHostPortInfo := decode(existingPort) - - // if there already exists one hostPort whose hostIP may be 127.0.0.1, then a hostPort (which wants 0.0.0.0 hostIP and has the same protocol and port) will not fit - if wantHostPortInfo.hostPort == existingHostPortInfo.hostPort && wantHostPortInfo.protocol == existingHostPortInfo.protocol { - return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil - } - } - } else { - // general check hostPort conflict procedure for hostIP is not 0.0.0.0 - if wantPort != "" && existingPorts[wantPort] { - return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil - } - } + // try to see whether existingPorts and wantPorts will conflict or not + if portsConflict(existingPorts, wantPorts) { + return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil } return true, nil, nil diff --git a/plugin/pkg/scheduler/algorithm/predicates/utils.go b/plugin/pkg/scheduler/algorithm/predicates/utils.go index 5e115575caa..d51f6cd633b 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/utils.go +++ b/plugin/pkg/scheduler/algorithm/predicates/utils.go @@ -22,6 +22,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util" ) // FindLabelsInSet gets as many key/value pairs as possible out of a label set. @@ -98,7 +99,7 @@ type hostPortInfo struct { hostPort string } -// decode a string ("protocol/hostIP/hostPort") to *hostPortInfo object +// decode decodes string ("protocol/hostIP/hostPort") to *hostPortInfo object. func decode(info string) *hostPortInfo { hostPortInfoSlice := strings.Split(info, "/") @@ -112,3 +113,48 @@ func decode(info string) *hostPortInfo { hostPort: hostPort, } } + +// specialPortConflictCheck detects whether specailHostPort(whose hostIP is 0.0.0.0) is conflict with otherHostPorts. +// return true if we have a conflict. +func specialPortConflictCheck(specialHostPort string, otherHostPorts map[string]bool) bool { + specialHostPortInfo := decode(specialHostPort) + + if specialHostPortInfo.hostIP == schedutil.DefaultBindAllHostIP { + // loop through all the otherHostPorts to see if there exists a conflict + for hostPortItem := range otherHostPorts { + hostPortInfo := decode(hostPortItem) + + // if there exists one hostPortItem which has the same hostPort and protocol with the specialHostPort, that will cause a conflict + if specialHostPortInfo.hostPort == hostPortInfo.hostPort && specialHostPortInfo.protocol == hostPortInfo.protocol { + return true + } + } + + } + + return false +} + +// portsConflict check whether existingPorts and wantPorts conflict with each other +// return true if we have a conflict +func portsConflict(existingPorts, wantPorts map[string]bool) bool { + + for existingPort := range existingPorts { + if specialPortConflictCheck(existingPort, wantPorts) { + return true + } + } + + for wantPort := range wantPorts { + if specialPortConflictCheck(wantPort, existingPorts) { + return true + } + + // general check hostPort conflict procedure for hostIP is not 0.0.0.0 + if existingPorts[wantPort] { + return true + } + } + + return false +} diff --git a/plugin/pkg/scheduler/algorithm/predicates/utils_test.go b/plugin/pkg/scheduler/algorithm/predicates/utils_test.go index 00e35a6cb7f..308bd8da519 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/utils_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/utils_test.go @@ -113,3 +113,151 @@ func Test_decode(t *testing.T) { } } + +func Test_specialPortConflictCheck(t *testing.T) { + type args struct { + specialHostPort string + otherHostPorts map[string]bool + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "test-1", + args: args{ + specialHostPort: "TCP/0.0.0.0/80", + otherHostPorts: map[string]bool{ + "TCP/127.0.0.2/8080": true, + "TCP/127.0.0.1/80": true, + "UDP/127.0.0.2/8080": true, + }, + }, + want: true, + }, + { + name: "test-2", + args: args{ + specialHostPort: "TCP/0.0.0.0/80", + otherHostPorts: map[string]bool{ + "TCP/127.0.0.2/8080": true, + "UDP/127.0.0.1/80": true, + "UDP/127.0.0.2/8080": true, + }, + }, + want: false, + }, + { + name: "test-3", + args: args{ + specialHostPort: "TCP/0.0.0.0/80", + otherHostPorts: map[string]bool{ + "TCP/127.0.0.2/8080": true, + "TCP/127.0.0.1/8090": true, + "UDP/127.0.0.2/8080": true, + }, + }, + want: false, + }, + { + name: "test-4", + args: args{ + specialHostPort: "TCP/0.0.0.0/80", + otherHostPorts: map[string]bool{ + "UDP/127.0.0.2/8080": true, + "UDP/127.0.0.1/8090": true, + "TCP/127.0.0.2/8080": true, + }, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := specialPortConflictCheck(tt.args.specialHostPort, tt.args.otherHostPorts); got != tt.want { + t.Errorf("specialPortConflictCheck() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_portsConflict(t *testing.T) { + type args struct { + existingPorts map[string]bool + wantPorts map[string]bool + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "test1", + args: args{ + existingPorts: map[string]bool{ + "UDP/127.0.0.1/8080": true, + }, + wantPorts: map[string]bool{ + "UDP/127.0.0.1/8080": true, + }, + }, + want: true, + }, + { + name: "test2", + args: args{ + existingPorts: map[string]bool{ + "UDP/127.0.0.2/8080": true, + }, + wantPorts: map[string]bool{ + "UDP/127.0.0.1/8080": true, + }, + }, + want: false, + }, + { + name: "test3", + args: args{ + existingPorts: map[string]bool{ + "TCP/127.0.0.1/8080": true, + }, + wantPorts: map[string]bool{ + "UDP/127.0.0.1/8080": true, + }, + }, + want: false, + }, + { + name: "test4", + args: args{ + existingPorts: map[string]bool{ + "TCP/0.0.0.0/8080": true, + }, + wantPorts: map[string]bool{ + "TCP/127.0.0.1/8080": true, + }, + }, + want: true, + }, + { + name: "test5", + args: args{ + existingPorts: map[string]bool{ + "TCP/127.0.0.1/8080": true, + }, + wantPorts: map[string]bool{ + "TCP/0.0.0.0/8080": true, + }, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := portsConflict(tt.args.existingPorts, tt.args.wantPorts); got != tt.want { + t.Errorf("portsConflict() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index df18fba450c..c584bc6cb87 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -330,7 +330,6 @@ func (sched *Scheduler) scheduleOne() { if err != nil { return } - // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { err := sched.bind(&assumedPod, &v1.Binding{ diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index ddb9d39df55..07f769f9cba 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -256,7 +256,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { case <-waitPodExpireChan: case <-time.After(wait.ForeverTestTimeout): close(timeout) - t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout) } // We use conflicted pod ports to incur fit predicate failure if first pod not removed. @@ -273,7 +273,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { t.Errorf("binding want=%v, get=%v", expectBinding, b) } case <-time.After(wait.ForeverTestTimeout): - t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout) } } @@ -307,7 +307,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { t.Errorf("err want=%v, get=%v", expectErr, err) } case <-time.After(wait.ForeverTestTimeout): - t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout) } // We mimic the workflow of cache behavior when a pod is removed by user. @@ -334,7 +334,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { t.Errorf("binding want=%v, get=%v", expectBinding, b) } case <-time.After(wait.ForeverTestTimeout): - t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout) } } diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go index cde40585c31..e3f8a75d745 100644 --- a/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -228,7 +228,7 @@ func TestExpirePod(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": false, "TCP/127.0.0.1/8080": true}, + usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true}, }, }} @@ -277,7 +277,7 @@ func TestAddPodWillConfirm(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, - usedPorts: map[string]bool{"TCP/127.0.0.1/80": true, "TCP/127.0.0.1/8080": false}, + usedPorts: map[string]bool{"TCP/127.0.0.1/80": true}, }, }} @@ -332,7 +332,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { }, allocatableResource: &Resource{}, pods: []*v1.Pod{updatedPod.DeepCopy()}, - usedPorts: map[int]bool{90: true}, + usedPorts: map[string]bool{"TCP/0.0.0.0/90": true}, }, }, }} diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 37eefeb288d..13f71d525a0 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -417,7 +417,7 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) { // user does not explicitly set hostIP, default is 0.0.0.0 portHostIP := podPort.HostIP if podPort.HostIP == "" { - portHostIP = "0.0.0.0" + portHostIP = util.DefaultBindAllHostIP } str := fmt.Sprintf("%s/%s/%d", portProtocol, portHostIP, podPort.HostPort) diff --git a/plugin/pkg/scheduler/util/utils.go b/plugin/pkg/scheduler/util/utils.go index c4193d626ff..ce1d1ad992a 100644 --- a/plugin/pkg/scheduler/util/utils.go +++ b/plugin/pkg/scheduler/util/utils.go @@ -24,6 +24,8 @@ import ( "k8s.io/kubernetes/pkg/apis/scheduling" ) +const DefaultBindAllHostIP = "0.0.0.0" + // GetUsedPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair // will be in the result; but it does not resolve port conflict. func GetUsedPorts(pods ...*v1.Pod) map[string]bool { diff --git a/test/e2e/scheduling/predicates.go b/test/e2e/scheduling/predicates.go index 19044d12c89..4822aa0ba56 100644 --- a/test/e2e/scheduling/predicates.go +++ b/test/e2e/scheduling/predicates.go @@ -592,6 +592,54 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() { WaitForSchedulerAfterAction(f, removeTaintFromNodeAction(cs, nodeName, testTaint), podNameNoTolerations, true) verifyResult(cs, 1, 0, ns) }) + + It("validates that there is no conflict between pods with same hostPort but different hostIP and protocol", func() { + + nodeName := GetNodeThatCanRunPod(f) + + // use nodeSelector to make sure the testing pods get assigned on the same node to explicitly verify there exists conflict or not + By("Trying to apply a random label on the found node.") + k := fmt.Sprintf("kubernetes.io/e2e-%s", string(uuid.NewUUID())) + v := "90" + + nodeSelector := make(map[string]string) + nodeSelector[k] = v + + framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v) + framework.ExpectNodeHasLabel(cs, nodeName, k, v) + defer framework.RemoveLabelOffNode(cs, nodeName, k) + + By("Trying to create a pod(pod1) with hostport 80 and hostIP 127.0.0.1 and expect scheduled") + creatHostPortPodOnNode(f, "pod1", ns, "127.0.0.1", v1.ProtocolTCP, nodeSelector, true) + + By("Trying to create another pod(pod2) with hostport 80 but hostIP 127.0.0.2 on the node which pod1 resides and expect scheduled") + creatHostPortPodOnNode(f, "pod2", ns, "127.0.0.2", v1.ProtocolTCP, nodeSelector, true) + + By("Trying to create a third pod(pod3) with hostport 80, hostIP 127.0.0.2 but use UDP protocol on the node which pod2 resides") + creatHostPortPodOnNode(f, "pod3", ns, "127.0.0.2", v1.ProtocolUDP, nodeSelector, true) + }) + + It("validates that there exists conflict between pods with same hostPort and protocol but one using 0.0.0.0 hostIP", func() { + nodeName := GetNodeThatCanRunPod(f) + + // use nodeSelector to make sure the testing pods get assigned on the same node to explicitly verify there exists conflict or not + By("Trying to apply a random label on the found node.") + k := fmt.Sprintf("kubernetes.io/e2e-%s", string(uuid.NewUUID())) + v := "95" + + nodeSelector := make(map[string]string) + nodeSelector[k] = v + + framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v) + framework.ExpectNodeHasLabel(cs, nodeName, k, v) + defer framework.RemoveLabelOffNode(cs, nodeName, k) + + By("Trying to create a pod(pod4) with hostport 80 and hostIP 0.0.0.0(empty string here) and expect scheduled") + creatHostPortPodOnNode(f, "pod4", ns, "", v1.ProtocolTCP, nodeSelector, true) + + By("Trying to create another pod(pod5) with hostport 80 but hostIP 127.0.0.1 on the node which pod4 resides and expect not scheduled") + creatHostPortPodOnNode(f, "pod5", ns, "127.0.0.1", v1.ProtocolTCP, nodeSelector, false) + }) }) func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod { @@ -782,3 +830,24 @@ func CreateHostPortPods(f *framework.Framework, id string, replicas int, expectR framework.ExpectNoError(err) } } + +// create pod which using hostport on the specified node according to the nodeSelector +func creatHostPortPodOnNode(f *framework.Framework, podName, ns, hostIP string, protocol v1.Protocol, nodeSelector map[string]string, expectScheduled bool) { + createPausePod(f, pausePodConfig{ + Name: podName, + Ports: []v1.ContainerPort{ + { + HostPort: 80, + ContainerPort: 80, + Protocol: protocol, + HostIP: hostIP, + }, + }, + NodeSelector: nodeSelector, + }) + + err := framework.WaitForPodNotPending(f.ClientSet, ns, podName) + if expectScheduled { + framework.ExpectNoError(err) + } +}