Moves e2e service util functions into service_util.go and cleans up test codes

This commit is contained in:
Zihong Zheng 2016-12-29 15:35:47 -08:00
parent 3951ae4e1d
commit e5944f56dc
18 changed files with 1812 additions and 1780 deletions

View File

@ -142,6 +142,7 @@ go_library(
"//pkg/client/transport:go_default_library", "//pkg/client/transport:go_default_library",
"//pkg/client/unversioned/clientcmd:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library",
"//pkg/client/unversioned/clientcmd/api:go_default_library", "//pkg/client/unversioned/clientcmd/api:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library", "//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",

View File

@ -146,8 +146,8 @@ func testService(f *framework.Framework, sem *chaosmonkey.Semaphore, testDuringD
// Setup // Setup
serviceName := "service-test" serviceName := "service-test"
jig := NewServiceTestJig(f.ClientSet, serviceName) jig := framework.NewServiceTestJig(f.ClientSet, serviceName)
// nodeIP := pickNodeIP(jig.Client) // for later // nodeIP := framework.PickNodeIP(jig.Client) // for later
By("creating a TCP service " + serviceName + " with type=LoadBalancer in namespace " + f.Namespace.Name) By("creating a TCP service " + serviceName + " with type=LoadBalancer in namespace " + f.Namespace.Name)
// TODO it's weird that we have to do this and then wait WaitForLoadBalancer which changes // TODO it's weird that we have to do this and then wait WaitForLoadBalancer which changes
@ -155,11 +155,11 @@ func testService(f *framework.Framework, sem *chaosmonkey.Semaphore, testDuringD
tcpService := jig.CreateTCPServiceOrFail(f.Namespace.Name, func(s *v1.Service) { tcpService := jig.CreateTCPServiceOrFail(f.Namespace.Name, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeLoadBalancer s.Spec.Type = v1.ServiceTypeLoadBalancer
}) })
tcpService = jig.WaitForLoadBalancerOrFail(f.Namespace.Name, tcpService.Name, loadBalancerCreateTimeoutDefault) tcpService = jig.WaitForLoadBalancerOrFail(f.Namespace.Name, tcpService.Name, framework.LoadBalancerCreateTimeoutDefault)
jig.SanityCheckService(tcpService, v1.ServiceTypeLoadBalancer) jig.SanityCheckService(tcpService, v1.ServiceTypeLoadBalancer)
// Get info to hit it with // Get info to hit it with
tcpIngressIP := getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) tcpIngressIP := framework.GetIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
svcPort := int(tcpService.Spec.Ports[0].Port) svcPort := int(tcpService.Spec.Ports[0].Port)
By("creating pod to be part of service " + serviceName) By("creating pod to be part of service " + serviceName)
@ -169,7 +169,7 @@ func testService(f *framework.Framework, sem *chaosmonkey.Semaphore, testDuringD
// Hit it once before considering ourselves ready // Hit it once before considering ourselves ready
By("hitting the pod through the service's LoadBalancer") By("hitting the pod through the service's LoadBalancer")
jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeoutDefault) jig.TestReachableHTTP(tcpIngressIP, svcPort, framework.LoadBalancerLagTimeoutDefault)
sem.Ready() sem.Ready()
@ -187,7 +187,7 @@ func testService(f *framework.Framework, sem *chaosmonkey.Semaphore, testDuringD
// Sanity check and hit it once more // Sanity check and hit it once more
By("hitting the pod through the service's LoadBalancer") By("hitting the pod through the service's LoadBalancer")
jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeoutDefault) jig.TestReachableHTTP(tcpIngressIP, svcPort, framework.LoadBalancerLagTimeoutDefault)
jig.SanityCheckService(tcpService, v1.ServiceTypeLoadBalancer) jig.SanityCheckService(tcpService, v1.ServiceTypeLoadBalancer)
} }

View File

@ -302,7 +302,7 @@ var _ = framework.KubeDescribe("DaemonRestart [Disruptive]", func() {
It("Kubelet should not restart containers across restart", func() { It("Kubelet should not restart containers across restart", func() {
nodeIPs, err := getNodePublicIps(f.ClientSet) nodeIPs, err := framework.GetNodePublicIps(f.ClientSet)
framework.ExpectNoError(err) framework.ExpectNoError(err)
preRestarts, badNodes := getContainerRestarts(f.ClientSet, ns, labelSelector) preRestarts, badNodes := getContainerRestarts(f.ClientSet, ns, labelSelector)
if preRestarts != 0 { if preRestarts != 0 {

View File

@ -35,8 +35,11 @@ import (
// schedulingTimeout is longer specifically because sometimes we need to wait // schedulingTimeout is longer specifically because sometimes we need to wait
// awhile to guarantee that we've been patient waiting for something ordinary // awhile to guarantee that we've been patient waiting for something ordinary
// to happen: a pod to get scheduled and move into Ready // to happen: a pod to get scheduled and move into Ready
const schedulingTimeout = 10 * time.Minute const (
const bigClusterSize = 7 bigClusterSize = 7
schedulingTimeout = 10 * time.Minute
timeout = 60 * time.Second
)
var _ = framework.KubeDescribe("DisruptionController", func() { var _ = framework.KubeDescribe("DisruptionController", func() {
f := framework.NewDefaultFramework("disruption") f := framework.NewDefaultFramework("disruption")

View File

@ -464,7 +464,7 @@ var _ = framework.KubeDescribe("DNS", func() {
// Test changing the externalName field // Test changing the externalName field
By("changing the externalName to bar.example.com") By("changing the externalName to bar.example.com")
_, err = updateService(f.ClientSet, f.Namespace.Name, serviceName, func(s *v1.Service) { _, err = framework.UpdateService(f.ClientSet, f.Namespace.Name, serviceName, func(s *v1.Service) {
s.Spec.ExternalName = "bar.example.com" s.Spec.ExternalName = "bar.example.com"
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -481,7 +481,7 @@ var _ = framework.KubeDescribe("DNS", func() {
// Test changing type from ExternalName to ClusterIP // Test changing type from ExternalName to ClusterIP
By("changing the service to type=ClusterIP") By("changing the service to type=ClusterIP")
_, err = updateService(f.ClientSet, f.Namespace.Name, serviceName, func(s *v1.Service) { _, err = framework.UpdateService(f.ClientSet, f.Namespace.Name, serviceName, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeClusterIP s.Spec.Type = v1.ServiceTypeClusterIP
s.Spec.ClusterIP = "127.1.2.3" s.Spec.ClusterIP = "127.1.2.3"
s.Spec.Ports = []v1.ServicePort{ s.Spec.Ports = []v1.ServicePort{

View File

@ -408,9 +408,9 @@ func updateIngressOrFail(clientset *fedclientset.Clientset, namespace string) (n
func (j *federationTestJig) waitForFederatedIngress() { func (j *federationTestJig) waitForFederatedIngress() {
// Wait for the loadbalancer IP. // Wait for the loadbalancer IP.
address, err := waitForFederatedIngressAddress(j.client, j.ing.Namespace, j.ing.Name, lbPollTimeout) address, err := waitForFederatedIngressAddress(j.client, j.ing.Namespace, j.ing.Name, framework.LoadBalancerPollTimeout)
if err != nil { if err != nil {
framework.Failf("Ingress failed to acquire an IP address within %v", lbPollTimeout) framework.Failf("Ingress failed to acquire an IP address within %v", framework.LoadBalancerPollTimeout)
} }
j.address = address j.address = address
framework.Logf("Found address %v for ingress %v", j.address, j.ing.Name) framework.Logf("Found address %v for ingress %v", j.address, j.ing.Name)
@ -422,7 +422,7 @@ func (j *federationTestJig) waitForFederatedIngress() {
for _, p := range rules.IngressRuleValue.HTTP.Paths { for _, p := range rules.IngressRuleValue.HTTP.Paths {
route := fmt.Sprintf("%v://%v%v", proto, address, p.Path) route := fmt.Sprintf("%v://%v%v", proto, address, p.Path)
framework.Logf("Testing route %v host %v with simple GET", route, rules.Host) framework.Logf("Testing route %v host %v with simple GET", route, rules.Host)
framework.ExpectNoError(pollURL(route, rules.Host, lbPollTimeout, lbPollInterval, timeoutClient, false)) framework.ExpectNoError(pollURL(route, rules.Host, framework.LoadBalancerPollTimeout, framework.LoadBalancerPollInterval, timeoutClient, false))
} }
} }
} }

View File

@ -18,7 +18,6 @@ package e2e
import ( import (
"fmt" "fmt"
"time"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -31,14 +30,6 @@ import (
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
const (
firewallTimeoutDefault = 3 * time.Minute
firewallTestTcpTimeout = time.Duration(1 * time.Second)
// Set ports outside of 30000-32767, 80 and 8080 to avoid being whitelisted by the e2e cluster
firewallTestHttpPort = int32(29999)
firewallTestUdpPort = int32(29998)
)
var _ = framework.KubeDescribe("Firewall rule", func() { var _ = framework.KubeDescribe("Firewall rule", func() {
var firewall_test_name = "firewall-test" var firewall_test_name = "firewall-test"
f := framework.NewDefaultFramework(firewall_test_name) f := framework.NewDefaultFramework(firewall_test_name)
@ -61,8 +52,8 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
firewallTestSourceRanges := []string{"0.0.0.0/1", "128.0.0.0/1"} firewallTestSourceRanges := []string{"0.0.0.0/1", "128.0.0.0/1"}
serviceName := "firewall-test-loadbalancer" serviceName := "firewall-test-loadbalancer"
jig := NewServiceTestJig(cs, serviceName) jig := framework.NewServiceTestJig(cs, serviceName)
nodesNames := jig.GetNodesNames(maxNodesForEndpointsTests) nodesNames := jig.GetNodesNames(framework.MaxNodesForEndpointsTests)
if len(nodesNames) <= 0 { if len(nodesNames) <= 0 {
framework.Failf("Expect at least 1 node, got: %v", nodesNames) framework.Failf("Expect at least 1 node, got: %v", nodesNames)
} }
@ -70,9 +61,9 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
// OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE // OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE
By("Creating a LoadBalancer type service with onlyLocal annotation") By("Creating a LoadBalancer type service with onlyLocal annotation")
svc := jig.createOnlyLocalLoadBalancerService(ns, serviceName, svc := jig.CreateOnlyLocalLoadBalancerService(ns, serviceName,
loadBalancerCreateTimeoutDefault, false, func(svc *v1.Service) { framework.LoadBalancerCreateTimeoutDefault, false, func(svc *v1.Service) {
svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: firewallTestHttpPort}} svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: framework.FirewallTestHttpPort}}
svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges
}) })
defer func() { defer func() {
@ -91,10 +82,10 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(framework.VerifyFirewallRule(fw, expFw, cloudConfig.Network, false)).NotTo(HaveOccurred()) Expect(framework.VerifyFirewallRule(fw, expFw, cloudConfig.Network, false)).NotTo(HaveOccurred())
By(fmt.Sprintf("Creating netexec pods on at most %v nodes", maxNodesForEndpointsTests)) By(fmt.Sprintf("Creating netexec pods on at most %v nodes", framework.MaxNodesForEndpointsTests))
for i, nodeName := range nodesNames { for i, nodeName := range nodesNames {
podName := fmt.Sprintf("netexec%v", i) podName := fmt.Sprintf("netexec%v", i)
jig.LaunchNetexecPodOnNode(f, nodeName, podName, firewallTestHttpPort, firewallTestUdpPort, true) jig.LaunchNetexecPodOnNode(f, nodeName, podName, framework.FirewallTestHttpPort, framework.FirewallTestUdpPort, true)
defer func() { defer func() {
framework.Logf("Cleaning up the netexec pod: %v", podName) framework.Logf("Cleaning up the netexec pod: %v", podName)
Expect(cs.Core().Pods(ns).Delete(podName, nil)).NotTo(HaveOccurred()) Expect(cs.Core().Pods(ns).Delete(podName, nil)).NotTo(HaveOccurred())
@ -103,7 +94,7 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
// Send requests from outside of the cluster because internal traffic is whitelisted // Send requests from outside of the cluster because internal traffic is whitelisted
By("Accessing the external service ip from outside, all non-master nodes should be reached") By("Accessing the external service ip from outside, all non-master nodes should be reached")
Expect(testHitNodesFromOutside(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet)).NotTo(HaveOccurred()) Expect(framework.TestHitNodesFromOutside(svcExternalIP, framework.FirewallTestHttpPort, framework.FirewallTimeoutDefault, nodesSet)).NotTo(HaveOccurred())
// Check if there are overlapping tags on the firewall that extend beyond just the vms in our cluster // Check if there are overlapping tags on the firewall that extend beyond just the vms in our cluster
// by removing the tag on one vm and make sure it doesn't get any traffic. This is an imperfect // by removing the tag on one vm and make sure it doesn't get any traffic. This is an imperfect
@ -117,11 +108,11 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
nodesSet.Insert(nodesNames[0]) nodesSet.Insert(nodesNames[0])
framework.SetInstanceTags(cloudConfig, nodesNames[0], removedTags) framework.SetInstanceTags(cloudConfig, nodesNames[0], removedTags)
// Make sure traffic is recovered before exit // Make sure traffic is recovered before exit
Expect(testHitNodesFromOutside(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet)).NotTo(HaveOccurred()) Expect(framework.TestHitNodesFromOutside(svcExternalIP, framework.FirewallTestHttpPort, framework.FirewallTimeoutDefault, nodesSet)).NotTo(HaveOccurred())
}() }()
By("Accessing serivce through the external ip and examine got no response from the node without tags") By("Accessing serivce through the external ip and examine got no response from the node without tags")
Expect(testHitNodesFromOutsideWithCount(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet, 15)).NotTo(HaveOccurred()) Expect(framework.TestHitNodesFromOutsideWithCount(svcExternalIP, framework.FirewallTestHttpPort, framework.FirewallTimeoutDefault, nodesSet, 15)).NotTo(HaveOccurred())
}) })
It("should have correct firewall rules for e2e cluster", func() { It("should have correct firewall rules for e2e cluster", func() {
@ -147,15 +138,15 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
nodeAddrs := framework.NodeAddresses(nodes, v1.NodeExternalIP) nodeAddrs := framework.NodeAddresses(nodes, v1.NodeExternalIP)
Expect(len(nodeAddrs)).NotTo(BeZero()) Expect(len(nodeAddrs)).NotTo(BeZero())
masterAddr := framework.GetMasterAddress(cs) masterAddr := framework.GetMasterAddress(cs)
flag, _ := testNotReachableHTTPTimeout(masterAddr, ports.ControllerManagerPort, firewallTestTcpTimeout) flag, _ := framework.TestNotReachableHTTPTimeout(masterAddr, ports.ControllerManagerPort, framework.FirewallTestTcpTimeout)
Expect(flag).To(BeTrue()) Expect(flag).To(BeTrue())
flag, _ = testNotReachableHTTPTimeout(masterAddr, ports.SchedulerPort, firewallTestTcpTimeout) flag, _ = framework.TestNotReachableHTTPTimeout(masterAddr, ports.SchedulerPort, framework.FirewallTestTcpTimeout)
Expect(flag).To(BeTrue()) Expect(flag).To(BeTrue())
flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletPort, firewallTestTcpTimeout) flag, _ = framework.TestNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletPort, framework.FirewallTestTcpTimeout)
Expect(flag).To(BeTrue()) Expect(flag).To(BeTrue())
flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletReadOnlyPort, firewallTestTcpTimeout) flag, _ = framework.TestNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletReadOnlyPort, framework.FirewallTestTcpTimeout)
Expect(flag).To(BeTrue()) Expect(flag).To(BeTrue())
flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.ProxyStatusPort, firewallTestTcpTimeout) flag, _ = framework.TestNotReachableHTTPTimeout(nodeAddrs[0], ports.ProxyStatusPort, framework.FirewallTestTcpTimeout)
Expect(flag).To(BeTrue()) Expect(flag).To(BeTrue())
}) })
}) })

View File

@ -26,6 +26,7 @@ go_library(
"perf_util.go", "perf_util.go",
"pods.go", "pods.go",
"resource_usage_gatherer.go", "resource_usage_gatherer.go",
"service_util.go",
"test_context.go", "test_context.go",
"util.go", "util.go",
], ],
@ -36,6 +37,7 @@ go_library(
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library", "//pkg/api/errors:go_default_library",
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/api/v1/service:go_default_library",
"//pkg/api/validation:go_default_library", "//pkg/api/validation:go_default_library",
"//pkg/apimachinery/registered:go_default_library", "//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library",
@ -82,6 +84,7 @@ go_library(
"//pkg/util/exec:go_default_library", "//pkg/util/exec:go_default_library",
"//pkg/util/intstr:go_default_library", "//pkg/util/intstr:go_default_library",
"//pkg/util/labels:go_default_library", "//pkg/util/labels:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/rand:go_default_library", "//pkg/util/rand:go_default_library",
"//pkg/util/runtime:go_default_library", "//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library", "//pkg/util/sets:go_default_library",

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"time"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -31,6 +32,14 @@ import (
compute "google.golang.org/api/compute/v1" compute "google.golang.org/api/compute/v1"
) )
const (
FirewallTimeoutDefault = 3 * time.Minute
FirewallTestTcpTimeout = time.Duration(1 * time.Second)
// Set ports outside of 30000-32767, 80 and 8080 to avoid being whitelisted by the e2e cluster
FirewallTestHttpPort = int32(29999)
FirewallTestUdpPort = int32(29998)
)
// MakeFirewallNameForLBService return the expected firewall name for a LB service. // MakeFirewallNameForLBService return the expected firewall name for a LB service.
// This should match the formatting of makeFirewallName() in pkg/cloudprovider/providers/gce/gce.go // This should match the formatting of makeFirewallName() in pkg/cloudprovider/providers/gce/gce.go
func MakeFirewallNameForLBService(name string) string { func MakeFirewallNameForLBService(name string) string {

View File

@ -17,8 +17,13 @@ limitations under the License.
package framework package framework
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings" "strings"
"time" "time"
@ -30,6 +35,7 @@ import (
coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" coreclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/rand" "k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/uuid"
@ -586,3 +592,237 @@ func (config *NetworkingTestConfig) getServiceClient() coreclientset.ServiceInte
func (config *NetworkingTestConfig) getNamespacesClient() coreclientset.NamespaceInterface { func (config *NetworkingTestConfig) getNamespacesClient() coreclientset.NamespaceInterface {
return config.f.ClientSet.Core().Namespaces() return config.f.ClientSet.Core().Namespaces()
} }
func CheckReachabilityFromPod(expectToBeReachable bool, namespace, pod, target string) {
cmd := fmt.Sprintf("wget -T 5 -qO- %q", target)
err := wait.PollImmediate(Poll, 2*time.Minute, func() (bool, error) {
_, err := RunHostCmd(namespace, pod, cmd)
if expectToBeReachable && err != nil {
Logf("Expect target to be reachable. But got err: %v. Retry until timeout", err)
return false, nil
}
if !expectToBeReachable && err == nil {
Logf("Expect target NOT to be reachable. But it is reachable. Retry until timeout")
return false, nil
}
return true, nil
})
Expect(err).NotTo(HaveOccurred())
}
// Does an HTTP GET, but does not reuse TCP connections
// This masks problems where the iptables rule has changed, but we don't see it
// This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout
func httpGetNoConnectionPool(url string) (*http.Response, error) {
return httpGetNoConnectionPoolTimeout(url, 5*time.Second)
}
func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
tr := utilnet.SetTransportDefaults(&http.Transport{
DisableKeepAlives: true,
})
client := &http.Client{
Transport: tr,
Timeout: timeout,
}
return client.Get(url)
}
func TestReachableHTTP(ip string, port int, request string, expect string) (bool, error) {
return TestReachableHTTPWithContent(ip, port, request, expect, nil)
}
func TestReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) {
return TestReachableHTTPWithContentTimeout(ip, port, request, expect, content, 5*time.Second)
}
func TestReachableHTTPWithContentTimeout(ip string, port int, request string, expect string, content *bytes.Buffer, timeout time.Duration) (bool, error) {
url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", url)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", url)
return false, nil
}
Logf("Testing HTTP reachability of %v", url)
resp, err := httpGetNoConnectionPoolTimeout(url, timeout)
if err != nil {
Logf("Got error testing for reachability of %s: %v", url, err)
return false, nil
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
Logf("Got error reading response from %s: %v", url, err)
return false, nil
}
if resp.StatusCode != 200 {
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s",
resp.Status, url, string(body))
}
if !strings.Contains(string(body), expect) {
return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body))
}
if content != nil {
content.Write(body)
}
return true, nil
}
func TestNotReachableHTTP(ip string, port int) (bool, error) {
return TestNotReachableHTTPTimeout(ip, port, 5*time.Second)
}
func TestNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) (bool, error) {
url := fmt.Sprintf("http://%s:%d", ip, port)
if ip == "" {
Failf("Got empty IP for non-reachability check (%s)", url)
return false, nil
}
if port == 0 {
Failf("Got port==0 for non-reachability check (%s)", url)
return false, nil
}
Logf("Testing HTTP non-reachability of %v", url)
resp, err := httpGetNoConnectionPoolTimeout(url, timeout)
if err != nil {
Logf("Confirmed that %s is not reachable", url)
return true, nil
}
resp.Body.Close()
return false, nil
}
func TestReachableUDP(ip string, port int, request string, expect string) (bool, error) {
uri := fmt.Sprintf("udp://%s:%d", ip, port)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", uri)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", uri)
return false, nil
}
Logf("Testing UDP reachability of %v", uri)
con, err := net.Dial("udp", ip+":"+strconv.Itoa(port))
if err != nil {
return false, fmt.Errorf("Failed to dial %s:%d: %v", ip, port, err)
}
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
return false, fmt.Errorf("Failed to send request: %v", err)
}
var buf []byte = make([]byte, len(expect)+1)
err = con.SetDeadline(time.Now().Add(3 * time.Second))
if err != nil {
return false, fmt.Errorf("Failed to set deadline: %v", err)
}
_, err = con.Read(buf)
if err != nil {
return false, nil
}
if !strings.Contains(string(buf), expect) {
return false, fmt.Errorf("Failed to retrieve %q, got %q", expect, string(buf))
}
Logf("Successfully reached %v", uri)
return true, nil
}
func TestNotReachableUDP(ip string, port int, request string) (bool, error) {
uri := fmt.Sprintf("udp://%s:%d", ip, port)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", uri)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", uri)
return false, nil
}
Logf("Testing UDP non-reachability of %v", uri)
con, err := net.Dial("udp", ip+":"+strconv.Itoa(port))
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
var buf []byte = make([]byte, 1)
err = con.SetDeadline(time.Now().Add(3 * time.Second))
if err != nil {
return false, fmt.Errorf("Failed to set deadline: %v", err)
}
_, err = con.Read(buf)
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
return false, nil
}
func TestHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error {
return TestHitNodesFromOutsideWithCount(externalIP, httpPort, timeout, expectedHosts, 1)
}
func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String,
countToSucceed int) error {
Logf("Waiting up to %v for satisfying expectedHosts for %v times", timeout, countToSucceed)
hittedHosts := sets.NewString()
count := 0
condition := func() (bool, error) {
var respBody bytes.Buffer
reached, err := TestReachableHTTPWithContentTimeout(externalIP, int(httpPort), "/hostname", "", &respBody,
1*time.Second)
if err != nil || !reached {
return false, nil
}
hittedHost := strings.TrimSpace(respBody.String())
if !expectedHosts.Has(hittedHost) {
Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count)
count = 0
return false, nil
}
if !hittedHosts.Has(hittedHost) {
hittedHosts.Insert(hittedHost)
Logf("Missing %+v, got %+v", expectedHosts.Difference(hittedHosts), hittedHosts)
}
if hittedHosts.Equal(expectedHosts) {
count++
if count >= countToSucceed {
return true, nil
}
}
return false, nil
}
if err := wait.Poll(time.Second, timeout, condition); err != nil {
return fmt.Errorf("error waiting for expectedHosts: %v, hittedHosts: %v, count: %v, expected count: %v",
expectedHosts, hittedHosts, count, countToSucceed)
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -3829,6 +3829,80 @@ func LaunchHostExecPod(client clientset.Interface, ns, name string) *v1.Pod {
return pod return pod
} }
// newExecPodSpec returns the pod spec of exec pod
func newExecPodSpec(ns, generateName string) *v1.Pod {
immediate := int64(0)
pod := &v1.Pod{
ObjectMeta: v1.ObjectMeta{
GenerateName: generateName,
Namespace: ns,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &immediate,
Containers: []v1.Container{
{
Name: "exec",
Image: "gcr.io/google_containers/busybox:1.24",
Command: []string{"sh", "-c", "while true; do sleep 5; done"},
},
},
},
}
return pod
}
// CreateExecPodOrFail creates a simple busybox pod in a sleep loop used as a
// vessel for kubectl exec commands.
// Returns the name of the created pod.
func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tweak func(*v1.Pod)) string {
Logf("Creating new exec pod")
execPod := newExecPodSpec(ns, generateName)
if tweak != nil {
tweak(execPod)
}
created, err := client.Core().Pods(ns).Create(execPod)
Expect(err).NotTo(HaveOccurred())
err = wait.PollImmediate(Poll, 5*time.Minute, func() (bool, error) {
retrievedPod, err := client.Core().Pods(execPod.Namespace).Get(created.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}
return retrievedPod.Status.Phase == v1.PodRunning, nil
})
Expect(err).NotTo(HaveOccurred())
return created.Name
}
func CreatePodOrFail(c clientset.Interface, ns, name string, labels map[string]string, containerPorts []v1.ContainerPort) {
By(fmt.Sprintf("Creating pod %s in namespace %s", name, ns))
pod := &v1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: name,
Labels: labels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "pause",
Image: GetPauseImageName(c),
Ports: containerPorts,
// Add a dummy environment variable to work around a docker issue.
// https://github.com/docker/docker/issues/14203
Env: []v1.EnvVar{{Name: "FOO", Value: " "}},
},
},
},
}
_, err := c.Core().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred())
}
func DeletePodOrFail(c clientset.Interface, ns, name string) {
By(fmt.Sprintf("Deleting pod %s in namespace %s", name, ns))
err := c.Core().Pods(ns).Delete(name, nil)
Expect(err).NotTo(HaveOccurred())
}
// GetSigner returns an ssh.Signer for the provider ("gce", etc.) that can be // GetSigner returns an ssh.Signer for the provider ("gce", etc.) that can be
// used to SSH to their nodes. // used to SSH to their nodes.
func GetSigner(provider string) (ssh.Signer, error) { func GetSigner(provider string) (ssh.Signer, error) {
@ -5159,3 +5233,51 @@ func GetNodeExternalIP(node *v1.Node) string {
} }
return host return host
} }
// RcByNamePort returns a ReplicationController with specified name and port
func RcByNamePort(name string, replicas int32, image string, port int, protocol v1.Protocol,
labels map[string]string, gracePeriod *int64) *v1.ReplicationController {
return RcByNameContainer(name, replicas, image, labels, v1.Container{
Name: name,
Image: image,
Ports: []v1.ContainerPort{{ContainerPort: int32(port), Protocol: protocol}},
}, gracePeriod)
}
// RcByNameContainer returns a ReplicationControoler with specified name and container
func RcByNameContainer(name string, replicas int32, image string, labels map[string]string, c v1.Container,
gracePeriod *int64) *v1.ReplicationController {
zeroGracePeriod := int64(0)
// Add "name": name to the labels, overwriting if it exists.
labels["name"] = name
if gracePeriod == nil {
gracePeriod = &zeroGracePeriod
}
return &v1.ReplicationController{
TypeMeta: metav1.TypeMeta{
Kind: "ReplicationController",
APIVersion: registered.GroupOrDie(v1.GroupName).GroupVersion.String(),
},
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Spec: v1.ReplicationControllerSpec{
Replicas: func(i int32) *int32 { return &i }(replicas),
Selector: map[string]string{
"name": name,
},
Template: &v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{c},
TerminationGracePeriodSeconds: gracePeriod,
},
},
},
}
}

View File

@ -40,17 +40,9 @@ const (
// healthz port used to verify glbc restarted correctly on the master. // healthz port used to verify glbc restarted correctly on the master.
glbcHealthzPort = 8086 glbcHealthzPort = 8086
// On average it takes ~6 minutes for a single backend to come online in GCE.
lbPollTimeout = 15 * time.Minute
// General cloud resource poll timeout (eg: create static ip, firewall etc) // General cloud resource poll timeout (eg: create static ip, firewall etc)
cloudResourcePollTimeout = 5 * time.Minute cloudResourcePollTimeout = 5 * time.Minute
// Time required by the loadbalancer to cleanup, proportional to numApps/Ing.
// Bring the cleanup timeout back down to 5m once b/33588344 is resolved.
lbCleanupTimeout = 15 * time.Minute
lbPollInterval = 30 * time.Second
// Name of the config-map and key the ingress controller stores its uid in. // Name of the config-map and key the ingress controller stores its uid in.
uidConfigMap = "ingress-uid" uidConfigMap = "ingress-uid"
uidKey = "uid" uidKey = "uid"
@ -145,10 +137,10 @@ var _ = framework.KubeDescribe("Loadbalancing: L7", func() {
By("waiting for Ingress to come up with ip: " + ip) By("waiting for Ingress to come up with ip: " + ip)
httpClient := buildInsecureClient(reqTimeout) httpClient := buildInsecureClient(reqTimeout)
framework.ExpectNoError(pollURL(fmt.Sprintf("https://%v/", ip), "", lbPollTimeout, jig.pollInterval, httpClient, false)) framework.ExpectNoError(pollURL(fmt.Sprintf("https://%v/", ip), "", framework.LoadBalancerPollTimeout, jig.pollInterval, httpClient, false))
By("should reject HTTP traffic") By("should reject HTTP traffic")
framework.ExpectNoError(pollURL(fmt.Sprintf("http://%v/", ip), "", lbPollTimeout, jig.pollInterval, httpClient, true)) framework.ExpectNoError(pollURL(fmt.Sprintf("http://%v/", ip), "", framework.LoadBalancerPollTimeout, jig.pollInterval, httpClient, true))
By("should have correct firewall rule for ingress") By("should have correct firewall rule for ingress")
fw := gceController.getFirewallRule() fw := gceController.getFirewallRule()

View File

@ -181,7 +181,7 @@ func createComformanceTests(jig *testJig, ns string) []conformanceTests {
}) })
By("Checking that " + pathToFail + " is not exposed by polling for failure") By("Checking that " + pathToFail + " is not exposed by polling for failure")
route := fmt.Sprintf("http://%v%v", jig.address, pathToFail) route := fmt.Sprintf("http://%v%v", jig.address, pathToFail)
framework.ExpectNoError(pollURL(route, updateURLMapHost, lbCleanupTimeout, jig.pollInterval, &http.Client{Timeout: reqTimeout}, true)) framework.ExpectNoError(pollURL(route, updateURLMapHost, framework.LoadBalancerCleanupTimeout, jig.pollInterval, &http.Client{Timeout: reqTimeout}, true))
}, },
fmt.Sprintf("Waiting for path updates to reflect in L7"), fmt.Sprintf("Waiting for path updates to reflect in L7"),
}, },
@ -335,7 +335,7 @@ func describeIng(ns string) {
} }
func cleanupGCE(gceController *GCEIngressController) { func cleanupGCE(gceController *GCEIngressController) {
pollErr := wait.Poll(5*time.Second, lbCleanupTimeout, func() (bool, error) { pollErr := wait.Poll(5*time.Second, framework.LoadBalancerCleanupTimeout, func() (bool, error) {
if err := gceController.Cleanup(false); err != nil { if err := gceController.Cleanup(false); err != nil {
framework.Logf("Still waiting for glbc to cleanup:\n%v", err) framework.Logf("Still waiting for glbc to cleanup:\n%v", err)
return false, nil return false, nil
@ -347,7 +347,7 @@ func cleanupGCE(gceController *GCEIngressController) {
// controller. Delete this IP only after the controller has had a chance // controller. Delete this IP only after the controller has had a chance
// to cleanup or it might interfere with the controller, causing it to // to cleanup or it might interfere with the controller, causing it to
// throw out confusing events. // throw out confusing events.
if ipErr := wait.Poll(5*time.Second, lbCleanupTimeout, func() (bool, error) { if ipErr := wait.Poll(5*time.Second, framework.LoadBalancerCleanupTimeout, func() (bool, error) {
if err := gceController.deleteStaticIPs(); err != nil { if err := gceController.deleteStaticIPs(); err != nil {
framework.Logf("Failed to delete static-ip: %v\n", err) framework.Logf("Failed to delete static-ip: %v\n", err)
return false, nil return false, nil
@ -864,9 +864,9 @@ func (j *testJig) deleteIngress() {
// Ingress. // Ingress.
func (j *testJig) waitForIngress(waitForNodePort bool) { func (j *testJig) waitForIngress(waitForNodePort bool) {
// Wait for the loadbalancer IP. // Wait for the loadbalancer IP.
address, err := framework.WaitForIngressAddress(j.client, j.ing.Namespace, j.ing.Name, lbPollTimeout) address, err := framework.WaitForIngressAddress(j.client, j.ing.Namespace, j.ing.Name, framework.LoadBalancerPollTimeout)
if err != nil { if err != nil {
framework.Failf("Ingress failed to acquire an IP address within %v", lbPollTimeout) framework.Failf("Ingress failed to acquire an IP address within %v", framework.LoadBalancerPollTimeout)
} }
j.address = address j.address = address
framework.Logf("Found address %v for ingress %v", j.address, j.ing.Name) framework.Logf("Found address %v for ingress %v", j.address, j.ing.Name)
@ -889,7 +889,7 @@ func (j *testJig) waitForIngress(waitForNodePort bool) {
} }
route := fmt.Sprintf("%v://%v%v", proto, address, p.Path) route := fmt.Sprintf("%v://%v%v", proto, address, p.Path)
framework.Logf("Testing route %v host %v with simple GET", route, rules.Host) framework.Logf("Testing route %v host %v with simple GET", route, rules.Host)
framework.ExpectNoError(pollURL(route, rules.Host, lbPollTimeout, j.pollInterval, timeoutClient, false)) framework.ExpectNoError(pollURL(route, rules.Host, framework.LoadBalancerPollTimeout, j.pollInterval, timeoutClient, false))
} }
} }
} }
@ -1011,7 +1011,7 @@ type GCEIngressController struct {
} }
func newTestJig(c clientset.Interface) *testJig { func newTestJig(c clientset.Interface) *testJig {
return &testJig{client: c, rootCAs: map[string][]byte{}, pollInterval: lbPollInterval} return &testJig{client: c, rootCAs: map[string][]byte{}, pollInterval: framework.LoadBalancerPollInterval}
} }
// NginxIngressController manages implementation details of Ingress on Nginx. // NginxIngressController manages implementation details of Ingress on Nginx.

View File

@ -47,7 +47,7 @@ var _ = framework.KubeDescribe("Network", func() {
It("should set TCP CLOSE_WAIT timeout", func() { It("should set TCP CLOSE_WAIT timeout", func() {
nodes := framework.GetReadySchedulableNodesOrDie(fr.ClientSet) nodes := framework.GetReadySchedulableNodesOrDie(fr.ClientSet)
ips := collectAddresses(nodes, v1.NodeInternalIP) ips := framework.CollectAddresses(nodes, v1.NodeInternalIP)
if len(nodes.Items) < 2 { if len(nodes.Items) < 2 {
framework.Skipf( framework.Skipf(

View File

@ -824,7 +824,7 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
return false, err return false, err
} }
uidToPort := getContainerPortsByPodUID(endpoints) uidToPort := framework.GetContainerPortsByPodUID(endpoints)
if len(uidToPort) == 0 { if len(uidToPort) == 0 {
framework.Logf("No endpoint found, retrying") framework.Logf("No endpoint found, retrying")
return false, nil return false, nil

View File

@ -25,7 +25,6 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
@ -160,56 +159,10 @@ func newSVCByName(c clientset.Interface, ns, name string) error {
return err return err
} }
func rcByNamePort(name string, replicas int32, image string, port int, protocol v1.Protocol,
labels map[string]string, gracePeriod *int64) *v1.ReplicationController {
return rcByNameContainer(name, replicas, image, labels, v1.Container{
Name: name,
Image: image,
Ports: []v1.ContainerPort{{ContainerPort: int32(port), Protocol: protocol}},
}, gracePeriod)
}
func rcByNameContainer(name string, replicas int32, image string, labels map[string]string, c v1.Container,
gracePeriod *int64) *v1.ReplicationController {
zeroGracePeriod := int64(0)
// Add "name": name to the labels, overwriting if it exists.
labels["name"] = name
if gracePeriod == nil {
gracePeriod = &zeroGracePeriod
}
return &v1.ReplicationController{
TypeMeta: metav1.TypeMeta{
Kind: "ReplicationController",
APIVersion: registered.GroupOrDie(v1.GroupName).GroupVersion.String(),
},
ObjectMeta: v1.ObjectMeta{
Name: name,
},
Spec: v1.ReplicationControllerSpec{
Replicas: func(i int32) *int32 { return &i }(replicas),
Selector: map[string]string{
"name": name,
},
Template: &v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{c},
TerminationGracePeriodSeconds: gracePeriod,
},
},
},
}
}
// newRCByName creates a replication controller with a selector by name of name. // newRCByName creates a replication controller with a selector by name of name.
func newRCByName(c clientset.Interface, ns, name string, replicas int32, gracePeriod *int64) (*v1.ReplicationController, error) { func newRCByName(c clientset.Interface, ns, name string, replicas int32, gracePeriod *int64) (*v1.ReplicationController, error) {
By(fmt.Sprintf("creating replication controller %s", name)) By(fmt.Sprintf("creating replication controller %s", name))
return c.Core().ReplicationControllers(ns).Create(rcByNamePort( return c.Core().ReplicationControllers(ns).Create(framework.RcByNamePort(
name, replicas, serveHostnameImage, 9376, v1.ProtocolTCP, map[string]string{}, gracePeriod)) name, replicas, serveHostnameImage, 9376, v1.ProtocolTCP, map[string]string{}, gracePeriod))
} }

File diff suppressed because it is too large Load Diff