diff --git a/test/e2e/kubeproxy.go b/test/e2e/kubeproxy.go deleted file mode 100644 index 7d381bcf9c1..00000000000 --- a/test/e2e/kubeproxy.go +++ /dev/null @@ -1,577 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package e2e - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "strconv" - "strings" - "time" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - api "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apimachinery/registered" - client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util/intstr" - utilnet "k8s.io/kubernetes/pkg/util/net" - "k8s.io/kubernetes/pkg/util/uuid" - "k8s.io/kubernetes/pkg/util/wait" - "k8s.io/kubernetes/test/e2e/framework" -) - -const ( - endpointHttpPort = 8080 - endpointUdpPort = 8081 - testContainerHttpPort = 8080 - clusterHttpPort = 80 - clusterUdpPort = 90 - nodeHttpPort = 32080 - nodeUdpPort = 32081 - loadBalancerHttpPort = 100 - netexecImageName = "gcr.io/google_containers/netexec:1.5" - testPodName = "test-container-pod" - hostTestPodName = "host-test-container-pod" - nodePortServiceName = "node-port-service" - loadBalancerServiceName = "load-balancer-service" - enableLoadBalancerTest = false - hitEndpointRetryDelay = 1 * time.Second - // Number of retries to hit a given set of endpoints. Needs to be high - // because we verify iptables statistical rr loadbalancing. - testTries = 30 -) - -type KubeProxyTestConfig struct { - testContainerPod *api.Pod - hostTestContainerPod *api.Pod - endpointPods []*api.Pod - f *framework.Framework - nodePortService *api.Service - loadBalancerService *api.Service - externalAddrs []string - nodes []api.Node -} - -var _ = framework.KubeDescribe("KubeProxy", func() { - f := framework.NewDefaultFramework("e2e-kubeproxy") - config := &KubeProxyTestConfig{ - f: f, - } - - // Slow issue #14204 (10 min) - It("should test kube-proxy [Slow]", func() { - By("cleaning up any pre-existing namespaces used by this test") - config.cleanup() - - By("Setting up for the tests") - config.setup() - - //TODO Need to add hit externalIPs test - By("TODO: Need to add hit externalIPs test") - - By("Hit Test with All Endpoints") - config.hitAll() - - config.deleteNetProxyPod() - By("Hit Test with Fewer Endpoints") - config.hitAll() - - By("Deleting nodePortservice and ensuring that service cannot be hit") - config.deleteNodePortService() - config.hitNodePort(0) // expect 0 endpoints to be hit - - if enableLoadBalancerTest { - By("Deleting loadBalancerService and ensuring that service cannot be hit") - config.deleteLoadBalancerService() - config.hitLoadBalancer(0) // expect 0 endpoints to be hit - } - }) -}) - -func (config *KubeProxyTestConfig) hitAll() { - By("Hitting endpoints from host and container") - config.hitEndpoints() - - By("Hitting clusterIP from host and container") - config.hitClusterIP(len(config.endpointPods)) - - By("Hitting nodePort from host and container") - config.hitNodePort(len(config.endpointPods)) - - if enableLoadBalancerTest { - By("Waiting for LoadBalancer Ingress Setup") - config.waitForLoadBalancerIngressSetup() - - By("Hitting LoadBalancer") - config.hitLoadBalancer(len(config.endpointPods)) - } -} - -func (config *KubeProxyTestConfig) hitLoadBalancer(epCount int) { - lbIP := config.loadBalancerService.Status.LoadBalancer.Ingress[0].IP - hostNames := make(map[string]bool) - tries := epCount*epCount + 5 - for i := 0; i < tries; i++ { - transport := utilnet.SetTransportDefaults(&http.Transport{}) - httpClient := createHTTPClient(transport) - resp, err := httpClient.Get(fmt.Sprintf("http://%s:%d/hostName", lbIP, loadBalancerHttpPort)) - if err == nil { - defer resp.Body.Close() - hostName, err := ioutil.ReadAll(resp.Body) - if err == nil { - hostNames[string(hostName)] = true - } - } - transport.CloseIdleConnections() - } - Expect(len(hostNames)).To(BeNumerically("==", epCount), "LoadBalancer did not hit all pods") -} - -func createHTTPClient(transport *http.Transport) *http.Client { - client := &http.Client{ - Transport: transport, - Timeout: 5 * time.Second, - } - return client -} - -func (config *KubeProxyTestConfig) hitClusterIP(epCount int) { - clusterIP := config.nodePortService.Spec.ClusterIP - tries := epCount*epCount + testTries // if epCount == 0 - By("dialing(udp) node1 --> clusterIP:clusterUdpPort") - config.dialFromNode("udp", clusterIP, clusterUdpPort, tries, epCount) - By("dialing(http) node1 --> clusterIP:clusterHttpPort") - config.dialFromNode("http", clusterIP, clusterHttpPort, tries, epCount) - - By("dialing(udp) test container --> clusterIP:clusterUdpPort") - config.dialFromTestContainer("udp", clusterIP, clusterUdpPort, tries, epCount) - By("dialing(http) test container --> clusterIP:clusterHttpPort") - config.dialFromTestContainer("http", clusterIP, clusterHttpPort, tries, epCount) - - By("dialing(udp) endpoint container --> clusterIP:clusterUdpPort") - config.dialFromEndpointContainer("udp", clusterIP, clusterUdpPort, tries, epCount) - By("dialing(http) endpoint container --> clusterIP:clusterHttpPort") - config.dialFromEndpointContainer("http", clusterIP, clusterHttpPort, tries, epCount) -} - -func (config *KubeProxyTestConfig) hitNodePort(epCount int) { - node1_IP := config.externalAddrs[0] - tries := epCount*epCount + testTries // if epCount == 0 - By("dialing(udp) node1 --> node1:nodeUdpPort") - config.dialFromNode("udp", node1_IP, nodeUdpPort, tries, epCount) - By("dialing(http) node1 --> node1:nodeHttpPort") - config.dialFromNode("http", node1_IP, nodeHttpPort, tries, epCount) - - By("dialing(udp) test container --> node1:nodeUdpPort") - config.dialFromTestContainer("udp", node1_IP, nodeUdpPort, tries, epCount) - By("dialing(http) test container --> node1:nodeHttpPort") - config.dialFromTestContainer("http", node1_IP, nodeHttpPort, tries, epCount) - - By("dialing(udp) endpoint container --> node1:nodeUdpPort") - config.dialFromEndpointContainer("udp", node1_IP, nodeUdpPort, tries, epCount) - By("dialing(http) endpoint container --> node1:nodeHttpPort") - config.dialFromEndpointContainer("http", node1_IP, nodeHttpPort, tries, epCount) - - By("dialing(udp) node --> 127.0.0.1:nodeUdpPort") - config.dialFromNode("udp", "127.0.0.1", nodeUdpPort, tries, epCount) - By("dialing(http) node --> 127.0.0.1:nodeHttpPort") - config.dialFromNode("http", "127.0.0.1", nodeHttpPort, tries, epCount) - - node2_IP := config.externalAddrs[1] - By("dialing(udp) node1 --> node2:nodeUdpPort") - config.dialFromNode("udp", node2_IP, nodeUdpPort, tries, epCount) - By("dialing(http) node1 --> node2:nodeHttpPort") - config.dialFromNode("http", node2_IP, nodeHttpPort, tries, epCount) - - By("checking kube-proxy URLs") - config.getSelfURL("/healthz", "ok") - config.getSelfURL("/proxyMode", "iptables") // the default -} - -func (config *KubeProxyTestConfig) hitEndpoints() { - for _, endpointPod := range config.endpointPods { - Expect(len(endpointPod.Status.PodIP)).To(BeNumerically(">", 0), "podIP is empty:%s", endpointPod.Status.PodIP) - By("dialing(udp) endpointPodIP:endpointUdpPort from node1") - config.dialFromNode("udp", endpointPod.Status.PodIP, endpointUdpPort, 5, 1) - By("dialing(http) endpointPodIP:endpointHttpPort from node1") - config.dialFromNode("http", endpointPod.Status.PodIP, endpointHttpPort, 5, 1) - By("dialing(udp) endpointPodIP:endpointUdpPort from test container") - config.dialFromTestContainer("udp", endpointPod.Status.PodIP, endpointUdpPort, 5, 1) - By("dialing(http) endpointPodIP:endpointHttpPort from test container") - config.dialFromTestContainer("http", endpointPod.Status.PodIP, endpointHttpPort, 5, 1) - } -} - -func (config *KubeProxyTestConfig) dialFromEndpointContainer(protocol, targetIP string, targetPort, tries, expectedCount int) { - config.dialFromContainer(protocol, config.endpointPods[0].Status.PodIP, targetIP, endpointHttpPort, targetPort, tries, expectedCount) -} - -func (config *KubeProxyTestConfig) dialFromTestContainer(protocol, targetIP string, targetPort, tries, expectedCount int) { - config.dialFromContainer(protocol, config.testContainerPod.Status.PodIP, targetIP, testContainerHttpPort, targetPort, tries, expectedCount) -} - -func (config *KubeProxyTestConfig) dialFromContainer(protocol, containerIP, targetIP string, containerHttpPort, targetPort, tries, expectedCount int) { - cmd := fmt.Sprintf("curl -q 'http://%s:%d/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=%d'", - containerIP, - containerHttpPort, - protocol, - targetIP, - targetPort, - tries) - - By(fmt.Sprintf("Dialing from container. Running command:%s", cmd)) - stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, cmd) - var output map[string][]string - err := json.Unmarshal([]byte(stdout), &output) - Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Could not unmarshal curl response: %s", stdout)) - hostNamesMap := array2map(output["responses"]) - Expect(len(hostNamesMap)).To(BeNumerically("==", expectedCount), fmt.Sprintf("Response was:%v", output)) -} - -func (config *KubeProxyTestConfig) dialFromNode(protocol, targetIP string, targetPort, tries, expectedCount int) { - var cmd string - if protocol == "udp" { - cmd = fmt.Sprintf("echo 'hostName' | timeout -t 3 nc -w 1 -u %s %d", targetIP, targetPort) - } else { - cmd = fmt.Sprintf("curl -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort) - } - // TODO: This simply tells us that we can reach the endpoints. Check that - // the probability of hitting a specific endpoint is roughly the same as - // hitting any other. - forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; sleep %v; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd, hitEndpointRetryDelay) - By(fmt.Sprintf("Dialing from node. command:%s", forLoop)) - stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, forLoop) - Expect(strconv.Atoi(strings.TrimSpace(stdout))).To(BeNumerically("==", expectedCount)) -} - -func (config *KubeProxyTestConfig) getSelfURL(path string, expected string) { - cmd := fmt.Sprintf("curl -s --connect-timeout 1 http://localhost:10249%s", path) - By(fmt.Sprintf("Getting kube-proxy self URL %s", path)) - stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, cmd) - Expect(strings.Contains(stdout, expected)).To(BeTrue()) -} - -func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod { - probe := &api.Probe{ - InitialDelaySeconds: 10, - TimeoutSeconds: 30, - PeriodSeconds: 10, - SuccessThreshold: 1, - FailureThreshold: 3, - Handler: api.Handler{ - HTTPGet: &api.HTTPGetAction{ - Path: "/healthz", - Port: intstr.IntOrString{IntVal: endpointHttpPort}, - }, - }, - } - pod := &api.Pod{ - TypeMeta: unversioned.TypeMeta{ - Kind: "Pod", - APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(), - }, - ObjectMeta: api.ObjectMeta{ - Name: podName, - Namespace: config.f.Namespace.Name, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "webserver", - Image: netexecImageName, - ImagePullPolicy: api.PullIfNotPresent, - Command: []string{ - "/netexec", - fmt.Sprintf("--http-port=%d", endpointHttpPort), - fmt.Sprintf("--udp-port=%d", endpointUdpPort), - }, - Ports: []api.ContainerPort{ - { - Name: "http", - ContainerPort: endpointHttpPort, - }, - { - Name: "udp", - ContainerPort: endpointUdpPort, - Protocol: api.ProtocolUDP, - }, - }, - LivenessProbe: probe, - ReadinessProbe: probe, - }, - }, - NodeName: node, - }, - } - return pod -} - -func (config *KubeProxyTestConfig) createTestPodSpec() *api.Pod { - pod := &api.Pod{ - TypeMeta: unversioned.TypeMeta{ - Kind: "Pod", - APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(), - }, - ObjectMeta: api.ObjectMeta{ - Name: testPodName, - Namespace: config.f.Namespace.Name, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "webserver", - Image: netexecImageName, - ImagePullPolicy: api.PullIfNotPresent, - Command: []string{ - "/netexec", - fmt.Sprintf("--http-port=%d", endpointHttpPort), - fmt.Sprintf("--udp-port=%d", endpointUdpPort), - }, - Ports: []api.ContainerPort{ - { - Name: "http", - ContainerPort: testContainerHttpPort, - }, - }, - }, - }, - }, - } - return pod -} - -func (config *KubeProxyTestConfig) createNodePortService(selector map[string]string) { - serviceSpec := &api.Service{ - ObjectMeta: api.ObjectMeta{ - Name: nodePortServiceName, - }, - Spec: api.ServiceSpec{ - Type: api.ServiceTypeNodePort, - Ports: []api.ServicePort{ - {Port: clusterHttpPort, Name: "http", Protocol: api.ProtocolTCP, NodePort: nodeHttpPort, TargetPort: intstr.FromInt(endpointHttpPort)}, - {Port: clusterUdpPort, Name: "udp", Protocol: api.ProtocolUDP, NodePort: nodeUdpPort, TargetPort: intstr.FromInt(endpointUdpPort)}, - }, - Selector: selector, - }, - } - config.nodePortService = config.createService(serviceSpec) -} - -func (config *KubeProxyTestConfig) deleteNodePortService() { - err := config.getServiceClient().Delete(config.nodePortService.Name) - Expect(err).NotTo(HaveOccurred(), "error while deleting NodePortService. err:%v)", err) - time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted. -} - -func (config *KubeProxyTestConfig) createLoadBalancerService(selector map[string]string) { - serviceSpec := &api.Service{ - ObjectMeta: api.ObjectMeta{ - Name: loadBalancerServiceName, - }, - Spec: api.ServiceSpec{ - Type: api.ServiceTypeLoadBalancer, - Ports: []api.ServicePort{ - {Port: loadBalancerHttpPort, Name: "http", Protocol: "TCP", TargetPort: intstr.FromInt(endpointHttpPort)}, - }, - Selector: selector, - }, - } - config.createService(serviceSpec) -} - -func (config *KubeProxyTestConfig) deleteLoadBalancerService() { - go func() { config.getServiceClient().Delete(config.loadBalancerService.Name) }() - time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted. -} - -func (config *KubeProxyTestConfig) waitForLoadBalancerIngressSetup() { - err := wait.Poll(2*time.Second, 120*time.Second, func() (bool, error) { - service, err := config.getServiceClient().Get(loadBalancerServiceName) - if err != nil { - return false, err - } else { - if len(service.Status.LoadBalancer.Ingress) > 0 { - return true, nil - } else { - return false, fmt.Errorf("Service LoadBalancer Ingress was not setup.") - } - } - }) - Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to setup Load Balancer Service. err:%v", err)) - config.loadBalancerService, _ = config.getServiceClient().Get(loadBalancerServiceName) -} - -func (config *KubeProxyTestConfig) createTestPods() { - testContainerPod := config.createTestPodSpec() - hostTestContainerPod := framework.NewHostExecPodSpec(config.f.Namespace.Name, hostTestPodName) - - config.createPod(testContainerPod) - config.createPod(hostTestContainerPod) - - framework.ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name)) - framework.ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name)) - - var err error - config.testContainerPod, err = config.getPodClient().Get(testContainerPod.Name) - if err != nil { - framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err) - } - - config.hostTestContainerPod, err = config.getPodClient().Get(hostTestContainerPod.Name) - if err != nil { - framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err) - } -} - -func (config *KubeProxyTestConfig) createService(serviceSpec *api.Service) *api.Service { - _, err := config.getServiceClient().Create(serviceSpec) - Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err)) - - err = framework.WaitForService(config.f.Client, config.f.Namespace.Name, serviceSpec.Name, true, 5*time.Second, 45*time.Second) - Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err)) - - createdService, err := config.getServiceClient().Get(serviceSpec.Name) - Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err)) - - return createdService -} - -func (config *KubeProxyTestConfig) setup() { - By("creating a selector") - selectorName := "selector-" + string(uuid.NewUUID()) - serviceSelector := map[string]string{ - selectorName: "true", - } - - By("Getting node addresses") - framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client)) - nodeList := framework.GetReadySchedulableNodesOrDie(config.f.Client) - config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeExternalIP) - if len(config.externalAddrs) < 2 { - // fall back to legacy IPs - config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeLegacyHostIP) - } - Expect(len(config.externalAddrs)).To(BeNumerically(">=", 2), fmt.Sprintf("At least two nodes necessary with an external or LegacyHostIP")) - config.nodes = nodeList.Items - - if enableLoadBalancerTest { - By("Creating the LoadBalancer Service on top of the pods in kubernetes") - config.createLoadBalancerService(serviceSelector) - } - - By("Creating the service pods in kubernetes") - podName := "netserver" - config.endpointPods = config.createNetProxyPods(podName, serviceSelector) - - By("Creating the service on top of the pods in kubernetes") - config.createNodePortService(serviceSelector) - - By("Creating test pods") - config.createTestPods() -} - -func (config *KubeProxyTestConfig) cleanup() { - nsClient := config.getNamespacesClient() - nsList, err := nsClient.List(api.ListOptions{}) - if err == nil { - for _, ns := range nsList.Items { - if strings.Contains(ns.Name, config.f.BaseName) && ns.Name != config.f.Namespace.Name { - nsClient.Delete(ns.Name) - } - } - } -} - -func (config *KubeProxyTestConfig) createNetProxyPods(podName string, selector map[string]string) []*api.Pod { - framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client)) - nodes := framework.GetReadySchedulableNodesOrDie(config.f.Client) - - // create pods, one for each node - createdPods := make([]*api.Pod, 0, len(nodes.Items)) - for i, n := range nodes.Items { - podName := fmt.Sprintf("%s-%d", podName, i) - pod := config.createNetShellPodSpec(podName, n.Name) - pod.ObjectMeta.Labels = selector - createdPod := config.createPod(pod) - createdPods = append(createdPods, createdPod) - } - - // wait that all of them are up - runningPods := make([]*api.Pod, 0, len(nodes.Items)) - for _, p := range createdPods { - framework.ExpectNoError(config.f.WaitForPodReady(p.Name)) - rp, err := config.getPodClient().Get(p.Name) - framework.ExpectNoError(err) - runningPods = append(runningPods, rp) - } - - return runningPods -} - -func (config *KubeProxyTestConfig) deleteNetProxyPod() { - pod := config.endpointPods[0] - config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0)) - config.endpointPods = config.endpointPods[1:] - // wait for pod being deleted. - err := framework.WaitForPodToDisappear(config.f.Client, config.f.Namespace.Name, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout) - if err != nil { - framework.Failf("Failed to delete %s pod: %v", pod.Name, err) - } - // wait for endpoint being removed. - err = framework.WaitForServiceEndpointsNum(config.f.Client, config.f.Namespace.Name, nodePortServiceName, len(config.endpointPods), time.Second, wait.ForeverTestTimeout) - if err != nil { - framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName) - } - // wait for kube-proxy to catch up with the pod being deleted. - time.Sleep(5 * time.Second) -} - -func (config *KubeProxyTestConfig) createPod(pod *api.Pod) *api.Pod { - createdPod, err := config.getPodClient().Create(pod) - if err != nil { - framework.Failf("Failed to create %s pod: %v", pod.Name, err) - } - return createdPod -} - -func (config *KubeProxyTestConfig) getPodClient() client.PodInterface { - return config.f.Client.Pods(config.f.Namespace.Name) -} - -func (config *KubeProxyTestConfig) getServiceClient() client.ServiceInterface { - return config.f.Client.Services(config.f.Namespace.Name) -} - -func (config *KubeProxyTestConfig) getNamespacesClient() client.NamespaceInterface { - return config.f.Client.Namespaces() -} - -func array2map(arr []string) map[string]bool { - retval := make(map[string]bool) - if len(arr) == 0 { - return retval - } - for _, str := range arr { - retval[str] = true - } - return retval -} diff --git a/test/e2e/networking.go b/test/e2e/networking.go index ee6bdd337da..cb74b26cb84 100644 --- a/test/e2e/networking.go +++ b/test/e2e/networking.go @@ -19,26 +19,21 @@ package e2e import ( "fmt" "net/http" - "strings" - "time" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" ) var _ = framework.KubeDescribe("Networking", func() { - f := framework.NewDefaultFramework("nettest") - var svcname = "nettest" + f := framework.NewDefaultFramework(svcname) BeforeEach(func() { - //Assert basic external connectivity. - //Since this is not really a test of kubernetes in any way, we - //leave it as a pre-test assertion, rather than a Ginko test. + // Assert basic external connectivity. + // Since this is not really a test of kubernetes in any way, we + // leave it as a pre-test assertion, rather than a Ginko test. By("Executing a successful http request from the external internet") resp, err := http.Get("http://google.com") if err != nil { @@ -79,235 +74,152 @@ var _ = framework.KubeDescribe("Networking", func() { } }) - //Now we can proceed with the test. - It("should function for intra-pod communication [Conformance]", func() { + It("should check kube-proxy urls", func() { + // TODO: this is overkill we just need the host networking pod + // to hit kube-proxy urls. + config := NewNetworkingTestConfig(f) - By(fmt.Sprintf("Creating a service named %q in namespace %q", svcname, f.Namespace.Name)) - svc, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{ - ObjectMeta: api.ObjectMeta{ - Name: svcname, - Labels: map[string]string{ - "name": svcname, - }, - }, - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{{ - Protocol: "TCP", - Port: 8080, - TargetPort: intstr.FromInt(8080), - }}, - Selector: map[string]string{ - "name": svcname, - }, - }, - }) - if err != nil { - framework.Failf("unable to create test service named [%s] %v", svc.Name, err) - } - - // Clean up service - defer func() { - By("Cleaning up the service") - if err = f.Client.Services(f.Namespace.Name).Delete(svc.Name); err != nil { - framework.Failf("unable to delete svc %v: %v", svc.Name, err) - } - }() - - By("Creating a webserver (pending) pod on each node") - - framework.ExpectNoError(framework.WaitForAllNodesSchedulable(f.Client)) - nodes := framework.GetReadySchedulableNodesOrDie(f.Client) - // This test is super expensive in terms of network usage - large services - // result in huge "Endpoint" objects and all underlying pods read them - // periodically. Moreover, all KubeProxies watch all of them. - // Thus we limit the maximum number of pods under a service. - // - // TODO: Remove this limitation once services, endpoints and data flows - // between nodes and master are better optimized. - maxNodeCount := 250 - if len(nodes.Items) > maxNodeCount { - nodes.Items = nodes.Items[:maxNodeCount] - } - - if len(nodes.Items) == 1 { - // in general, the test requires two nodes. But for local development, often a one node cluster - // is created, for simplicity and speed. (see issue #10012). We permit one-node test - // only in some cases - if !framework.ProviderIs("local") { - framework.Failf(fmt.Sprintf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider)) - } - framework.Logf("Only one ready node is detected. The test has limited scope in such setting. " + - "Rerun it with at least two nodes to get complete coverage.") - } - - podNames := LaunchNetTestPodPerNode(f, nodes, svcname) - - // Clean up the pods - defer func() { - By("Cleaning up the webserver pods") - for _, podName := range podNames { - if err = f.Client.Pods(f.Namespace.Name).Delete(podName, nil); err != nil { - framework.Logf("Failed to delete pod %s: %v", podName, err) - } - } - }() - - By("Waiting for the webserver pods to transition to Running state") - for _, podName := range podNames { - err = f.WaitForPodRunning(podName) - Expect(err).NotTo(HaveOccurred()) - } - - By("Waiting for connectivity to be verified") - passed := false - - //once response OK, evaluate response body for pass/fail. - var body []byte - getDetails := func() ([]byte, error) { - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get()) - if errProxy != nil { - return nil, errProxy - } - return proxyRequest.Namespace(f.Namespace.Name). - Name(svc.Name). - Suffix("read"). - DoRaw() - } - - getStatus := func() ([]byte, error) { - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get()) - if errProxy != nil { - return nil, errProxy - } - return proxyRequest.Namespace(f.Namespace.Name). - Name(svc.Name). - Suffix("status"). - DoRaw() - } - - // nettest containers will wait for all service endpoints to come up for 2 minutes - // apply a 3 minutes observation period here to avoid this test to time out before the nettest starts to contact peers - timeout := time.Now().Add(3 * time.Minute) - for i := 0; !passed && timeout.After(time.Now()); i++ { - time.Sleep(2 * time.Second) - framework.Logf("About to make a proxy status call") - start := time.Now() - body, err = getStatus() - framework.Logf("Proxy status call returned in %v", time.Since(start)) - if err != nil { - framework.Logf("Attempt %v: service/pod still starting. (error: '%v')", i, err) - continue - } - // Finally, we pass/fail the test based on if the container's response body, as to whether or not it was able to find peers. - switch { - case string(body) == "pass": - framework.Logf("Passed on attempt %v. Cleaning up.", i) - passed = true - case string(body) == "running": - framework.Logf("Attempt %v: test still running", i) - case string(body) == "fail": - if body, err = getDetails(); err != nil { - framework.Failf("Failed on attempt %v. Cleaning up. Error reading details: %v", i, err) - } else { - framework.Failf("Failed on attempt %v. Cleaning up. Details:\n%s", i, string(body)) - } - case strings.Contains(string(body), "no endpoints available"): - framework.Logf("Attempt %v: waiting on service/endpoints", i) - default: - framework.Logf("Unexpected response:\n%s", body) - } - } - - if !passed { - if body, err = getDetails(); err != nil { - framework.Failf("Timed out. Cleaning up. Error reading details: %v", err) - } else { - framework.Failf("Timed out. Cleaning up. Details:\n%s", string(body)) - } - } - Expect(string(body)).To(Equal("pass")) + By("checking kube-proxy URLs") + config.getSelfURL("/healthz", "ok") + config.getSelfURL("/proxyMode", "iptables") // the default }) - framework.KubeDescribe("Granular Checks", func() { + framework.KubeDescribe("Granular Checks: Pods", func() { - connectivityTimeout := 10 - - It("should function for pod communication on a single node", func() { - - By("Picking a node") - nodes := framework.GetReadySchedulableNodesOrDie(f.Client) - node := nodes.Items[0] - - By("Creating a webserver pod") - podName := "same-node-webserver" - defer f.Client.Pods(f.Namespace.Name).Delete(podName, nil) - ip := framework.LaunchWebserverPod(f, podName, node.Name) - - By("Checking that the webserver is accessible from a pod on the same node") - framework.ExpectNoError(framework.CheckConnectivityToHost(f, node.Name, "same-node-wget", ip, connectivityTimeout)) - }) - - It("should function for pod communication between nodes", func() { - - podClient := f.Client.Pods(f.Namespace.Name) - - By("Picking multiple nodes") - nodes := framework.GetReadySchedulableNodesOrDie(f.Client) - - if len(nodes.Items) == 1 { - framework.Skipf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider) + // Try to hit all endpoints through a test container, retry 5 times, + // expect exactly one unique hostname. Each of these endpoints reports + // its own hostname. + It("should function for intra-pod communication: http [Conformance]", func() { + config := NewNetworkingTestConfig(f) + for _, endpointPod := range config.endpointPods { + config.dialFromTestContainer("http", endpointPod.Status.PodIP, endpointHttpPort, config.maxTries, 0, sets.NewString(endpointPod.Name)) } - - node1 := nodes.Items[0] - node2 := nodes.Items[1] - - By("Creating a webserver pod") - podName := "different-node-webserver" - defer podClient.Delete(podName, nil) - ip := framework.LaunchWebserverPod(f, podName, node1.Name) - - By("Checking that the webserver is accessible from a pod on a different node") - framework.ExpectNoError(framework.CheckConnectivityToHost(f, node2.Name, "different-node-wget", ip, connectivityTimeout)) }) + + It("should function for intra-pod communication: udp [Conformance]", func() { + config := NewNetworkingTestConfig(f) + for _, endpointPod := range config.endpointPods { + config.dialFromTestContainer("udp", endpointPod.Status.PodIP, endpointUdpPort, config.maxTries, 0, sets.NewString(endpointPod.Name)) + } + }) + + It("should function for node-pod communication: http [Conformance]", func() { + config := NewNetworkingTestConfig(f) + for _, endpointPod := range config.endpointPods { + config.dialFromNode("http", endpointPod.Status.PodIP, endpointHttpPort, config.maxTries, 0, sets.NewString(endpointPod.Name)) + } + }) + + It("should function for node-pod communication: udp [Conformance]", func() { + config := NewNetworkingTestConfig(f) + for _, endpointPod := range config.endpointPods { + config.dialFromNode("udp", endpointPod.Status.PodIP, endpointUdpPort, config.maxTries, 0, sets.NewString(endpointPod.Name)) + } + }) + }) + + // TODO: Remove [Slow] when this has had enough bake time to prove presubmit worthiness. + framework.KubeDescribe("Granular Checks: Services [Slow]", func() { + + It("should function for pod-Service: http", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(http) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterHttpPort)) + config.dialFromTestContainer("http", config.clusterIP, clusterHttpPort, config.maxTries, 0, config.endpointHostnames()) + + By(fmt.Sprintf("dialing(http) %v --> %v:%v (nodeIP)", config.testContainerPod.Name, config.externalAddrs[0], config.nodeHttpPort)) + config.dialFromTestContainer("http", config.nodeIP, config.nodeHttpPort, config.maxTries, 0, config.endpointHostnames()) + }) + + It("should function for pod-Service: udp", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(udp) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterUdpPort)) + config.dialFromTestContainer("udp", config.clusterIP, clusterUdpPort, config.maxTries, 0, config.endpointHostnames()) + + By(fmt.Sprintf("dialing(udp) %v --> %v:%v (nodeIP)", config.testContainerPod.Name, config.externalAddrs[0], config.nodeUdpPort)) + config.dialFromTestContainer("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, 0, config.endpointHostnames()) + }) + + It("should function for node-Service: http", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(http) %v (node) --> %v:%v (config.clusterIP)", config.nodeIP, config.clusterIP, clusterHttpPort)) + config.dialFromNode("http", config.clusterIP, clusterHttpPort, config.maxTries, 0, config.endpointHostnames()) + + By(fmt.Sprintf("dialing(http) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeHttpPort)) + config.dialFromNode("http", config.nodeIP, config.nodeHttpPort, config.maxTries, 0, config.endpointHostnames()) + }) + + It("should function for node-Service: udp", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(udp) %v (node) --> %v:%v (config.clusterIP)", config.nodeIP, config.clusterIP, clusterUdpPort)) + config.dialFromNode("udp", config.clusterIP, clusterUdpPort, config.maxTries, 0, config.endpointHostnames()) + + By(fmt.Sprintf("dialing(udp) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeUdpPort)) + config.dialFromNode("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, 0, config.endpointHostnames()) + }) + + It("should function for endpoint-Service: http", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(http) %v (endpoint) --> %v:%v (config.clusterIP)", config.endpointPods[0].Name, config.clusterIP, clusterHttpPort)) + config.dialFromEndpointContainer("http", config.clusterIP, clusterHttpPort, config.maxTries, 0, config.endpointHostnames()) + + By(fmt.Sprintf("dialing(http) %v (endpoint) --> %v:%v (nodeIP)", config.endpointPods[0].Name, config.nodeIP, config.nodeHttpPort)) + config.dialFromEndpointContainer("http", config.nodeIP, config.nodeHttpPort, config.maxTries, 0, config.endpointHostnames()) + }) + + It("should function for endpoint-Service: udp", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(udp) %v (endpoint) --> %v:%v (config.clusterIP)", config.endpointPods[0].Name, config.clusterIP, clusterUdpPort)) + config.dialFromEndpointContainer("udp", config.clusterIP, clusterUdpPort, config.maxTries, 0, config.endpointHostnames()) + + By(fmt.Sprintf("dialing(udp) %v (endpoint) --> %v:%v (nodeIP)", config.endpointPods[0].Name, config.nodeIP, config.nodeUdpPort)) + config.dialFromEndpointContainer("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, 0, config.endpointHostnames()) + }) + + It("should update endpoints: http", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(http) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterHttpPort)) + config.dialFromTestContainer("http", config.clusterIP, clusterHttpPort, config.maxTries, 0, config.endpointHostnames()) + + config.deleteNetProxyPod() + + By(fmt.Sprintf("dialing(http) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterHttpPort)) + config.dialFromTestContainer("http", config.clusterIP, clusterHttpPort, config.maxTries, config.maxTries, config.endpointHostnames()) + }) + + It("should update endpoints: udp", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(udp) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterUdpPort)) + config.dialFromTestContainer("udp", config.clusterIP, clusterUdpPort, config.maxTries, 0, config.endpointHostnames()) + + config.deleteNetProxyPod() + + By(fmt.Sprintf("dialing(udp) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterUdpPort)) + config.dialFromTestContainer("udp", config.clusterIP, clusterUdpPort, config.maxTries, config.maxTries, config.endpointHostnames()) + }) + + // Slow because we confirm that the nodePort doesn't serve traffic, which requires a period of polling. + It("should update nodePort: http [Slow]", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(http) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeHttpPort)) + config.dialFromNode("http", config.nodeIP, config.nodeHttpPort, config.maxTries, 0, config.endpointHostnames()) + + config.deleteNodePortService() + + By(fmt.Sprintf("dialing(http) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeHttpPort)) + config.dialFromNode("http", config.nodeIP, config.nodeHttpPort, config.maxTries, config.maxTries, sets.NewString()) + }) + + // Slow because we confirm that the nodePort doesn't serve traffic, which requires a period of polling. + It("should update nodePort: udp [Slow]", func() { + config := NewNetworkingTestConfig(f) + By(fmt.Sprintf("dialing(udp) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeUdpPort)) + config.dialFromNode("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, 0, config.endpointHostnames()) + + config.deleteNodePortService() + + By(fmt.Sprintf("dialing(udp) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeUdpPort)) + config.dialFromNode("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, config.maxTries, sets.NewString()) + }) + // TODO: Test sessionAffinity #31712 }) }) - -func LaunchNetTestPodPerNode(f *framework.Framework, nodes *api.NodeList, name string) []string { - podNames := []string{} - - totalPods := len(nodes.Items) - - Expect(totalPods).NotTo(Equal(0)) - - for _, node := range nodes.Items { - pod, err := f.Client.Pods(f.Namespace.Name).Create(&api.Pod{ - ObjectMeta: api.ObjectMeta{ - GenerateName: name + "-", - Labels: map[string]string{ - "name": name, - }, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "webserver", - Image: "gcr.io/google_containers/nettest:1.9", - Args: []string{ - "-service=" + name, - //peers >= totalPods should be asserted by the container. - //the nettest container finds peers by looking up list of svc endpoints. - fmt.Sprintf("-peers=%d", totalPods), - "-namespace=" + f.Namespace.Name}, - Ports: []api.ContainerPort{{ContainerPort: 8080}}, - }, - }, - NodeName: node.Name, - RestartPolicy: api.RestartPolicyNever, - }, - }) - Expect(err).NotTo(HaveOccurred()) - framework.Logf("Created pod %s on node %s", pod.ObjectMeta.Name, node.Name) - podNames = append(podNames, pod.ObjectMeta.Name) - } - return podNames -} diff --git a/test/e2e/networking_utils.go b/test/e2e/networking_utils.go new file mode 100644 index 00000000000..00762da9654 --- /dev/null +++ b/test/e2e/networking_utils.go @@ -0,0 +1,506 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + api "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apimachinery/registered" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/uuid" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" +) + +const ( + endpointHttpPort = 8080 + endpointUdpPort = 8081 + testContainerHttpPort = 8080 + clusterHttpPort = 80 + clusterUdpPort = 90 + netexecImageName = "gcr.io/google_containers/netexec:1.5" + hostexecImageName = "gcr.io/google_containers/hostexec:1.2" + testPodName = "test-container-pod" + hostTestPodName = "host-test-container-pod" + nodePortServiceName = "node-port-service" + hitEndpointRetryDelay = 1 * time.Second + // Number of retries to hit a given set of endpoints. Needs to be high + // because we verify iptables statistical rr loadbalancing. + testTries = 30 +) + +// NewNetworkingTestConfig creates and sets up a new test config helper. +func NewNetworkingTestConfig(f *framework.Framework) *NetworkingTestConfig { + config := &NetworkingTestConfig{f: f, ns: f.Namespace.Name} + By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.ns)) + config.setup() + return config +} + +// NetworkingTestConfig is a convenience class around some utility methods +// for testing kubeproxy/networking/services/endpoints. +type NetworkingTestConfig struct { + // testContaienrPod is a test pod running the netexec image. It is capable + // of executing tcp/udp requests against ip:port. + testContainerPod *api.Pod + // hostTestContainerPod is a pod running with hostNetworking=true, and the + // hostexec image. + hostTestContainerPod *api.Pod + // endpointPods are the pods belonging to the Service created by this + // test config. Each invocation of `setup` creates a service with + // 1 pod per node running the netexecImage. + endpointPods []*api.Pod + f *framework.Framework + // nodePortService is a Service with Type=NodePort spanning over all + // endpointPods. + nodePortService *api.Service + // externalAddrs is a list of external IPs of nodes in the cluster. + externalAddrs []string + // nodes is a list of nodes in the cluster. + nodes []api.Node + // maxTries is the number of retries tolerated for tests run against + // endpoints and services created by this config. + maxTries int + // The clusterIP of the Service reated by this test config. + clusterIP string + // External ip of first node for use in nodePort testing. + nodeIP string + // The http/udp nodePorts of the Service. + nodeHttpPort int + nodeUdpPort int + // The kubernetes namespace within which all resources for this + // config are created + ns string +} + +func (config *NetworkingTestConfig) dialFromEndpointContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) { + config.dialFromContainer(protocol, config.endpointPods[0].Status.PodIP, targetIP, endpointHttpPort, targetPort, maxTries, minTries, expectedEps) +} + +func (config *NetworkingTestConfig) dialFromTestContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) { + config.dialFromContainer(protocol, config.testContainerPod.Status.PodIP, targetIP, testContainerHttpPort, targetPort, maxTries, minTries, expectedEps) +} + +// diagnoseMissingEndpoints prints debug information about the endpoints that +// are NOT in the given list of foundEndpoints. These are the endpoints we +// expected a response from. +func (config *NetworkingTestConfig) diagnoseMissingEndpoints(foundEndpoints sets.String) { + for _, e := range config.endpointPods { + if foundEndpoints.Has(e.Name) { + continue + } + framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name) + desc, _ := framework.RunKubectl( + "describe", "pod", e.Name, fmt.Sprintf("--namespace=%v", e.Namespace)) + framework.Logf(desc) + } +} + +// endpointHostnames returns a set of hostnames for existing endpoints. +func (config *NetworkingTestConfig) endpointHostnames() sets.String { + expectedEps := sets.NewString() + for _, p := range config.endpointPods { + expectedEps.Insert(p.Name) + } + return expectedEps +} + +// dialFromContainers executes a curl via kubectl exec in a test container, +// which might then translate to a tcp or udp request based on the protocol +// argument in the url. +// - minTries is the minimum number of curl attempts required before declaring +// success. Set to 0 if you'd like to return as soon as all endpoints respond +// at least once. +// - maxTries is the maximum number of curl attempts. If this many attempts pass +// and we don't see all expected endpoints, the test fails. +// - expectedEps is the set of endpointnames to wait for. Typically this is also +// the hostname reported by each pod in the service through /hostName. +// maxTries == minTries will confirm that we see the expected endpoints and no +// more for maxTries. Use this if you want to eg: fail a readiness check on a +// pod and confirm it doesn't show up as an endpoint. +func (config *NetworkingTestConfig) dialFromContainer(protocol, containerIP, targetIP string, containerHttpPort, targetPort, maxTries, minTries int, expectedEps sets.String) { + cmd := fmt.Sprintf("curl -q -s 'http://%s:%d/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=1'", + containerIP, + containerHttpPort, + protocol, + targetIP, + targetPort) + + eps := sets.NewString() + + for i := 0; i < maxTries; i++ { + stdout, err := framework.RunHostCmd(config.ns, config.hostTestContainerPod.Name, cmd) + if err != nil { + // A failure to kubectl exec counts as a try, not a hard fail. + // Also note that we will keep failing for maxTries in tests where + // we confirm unreachability. + framework.Logf("Failed to execute %v: %v", cmd, err) + } else { + var output map[string][]string + if err := json.Unmarshal([]byte(stdout), &output); err != nil { + framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v", + cmd, config.hostTestContainerPod.Name, stdout, err) + continue + } + for _, hostName := range output["responses"] { + eps.Insert(hostName) + } + } + framework.Logf("Waiting for endpoints: %v", expectedEps.Difference(eps)) + + // Check against i+1 so we exit if minTries == maxTries. + if (eps.Equal(expectedEps) || eps.Len() == 0 && expectedEps.Len() == 0) && i+1 >= minTries { + return + } + } + + config.diagnoseMissingEndpoints(eps) + framework.Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", minTries, cmd, eps, expectedEps) +} + +// dialFromNode executes a tcp or udp request based on protocol via kubectl exec +// in a test container running with host networking. +// - minTries is the minimum number of curl attempts required before declaring +// success. Set to 0 if you'd like to return as soon as all endpoints respond +// at least once. +// - maxTries is the maximum number of curl attempts. If this many attempts pass +// and we don't see all expected endpoints, the test fails. +// maxTries == minTries will confirm that we see the expected endpoints and no +// more for maxTries. Use this if you want to eg: fail a readiness check on a +// pod and confirm it doesn't show up as an endpoint. +func (config *NetworkingTestConfig) dialFromNode(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) { + var cmd string + if protocol == "udp" { + cmd = fmt.Sprintf("echo 'hostName' | timeout -t 3 nc -w 1 -u %s %d", targetIP, targetPort) + } else { + cmd = fmt.Sprintf("curl -q -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort) + } + + // TODO: This simply tells us that we can reach the endpoints. Check that + // the probability of hitting a specific endpoint is roughly the same as + // hitting any other. + eps := sets.NewString() + + filterCmd := fmt.Sprintf("%s | grep -v '^\\s*$'", cmd) + for i := 0; i < maxTries; i++ { + stdout, err := framework.RunHostCmd(config.ns, config.hostTestContainerPod.Name, filterCmd) + if err != nil { + // A failure to kubectl exec counts as a try, not a hard fail. + // Also note that we will keep failing for maxTries in tests where + // we confirm unreachability. + framework.Logf("Failed to execute %v: %v", filterCmd, err) + } else { + eps.Insert(strings.TrimSpace(stdout)) + } + framework.Logf("Waiting for %+v endpoints, got endpoints %+v", expectedEps.Difference(eps), eps) + + // Check against i+1 so we exit if minTries == maxTries. + if (eps.Equal(expectedEps) || eps.Len() == 0 && expectedEps.Len() == 0) && i+1 >= minTries { + return + } + } + + config.diagnoseMissingEndpoints(eps) + framework.Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", minTries, cmd, eps, expectedEps) +} + +// getSelfURL executes a curl against the given path via kubectl exec into a +// test container running with host networking, and fails if the output +// doesn't match the expected string. +func (config *NetworkingTestConfig) getSelfURL(path string, expected string) { + cmd := fmt.Sprintf("curl -q -s --connect-timeout 1 http://localhost:10249%s", path) + By(fmt.Sprintf("Getting kube-proxy self URL %s", path)) + stdout := framework.RunHostCmdOrDie(config.ns, config.hostTestContainerPod.Name, cmd) + Expect(strings.Contains(stdout, expected)).To(BeTrue()) +} + +func (config *NetworkingTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod { + probe := &api.Probe{ + InitialDelaySeconds: 10, + TimeoutSeconds: 30, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 3, + Handler: api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{IntVal: endpointHttpPort}, + }, + }, + } + pod := &api.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(), + }, + ObjectMeta: api.ObjectMeta{ + Name: podName, + Namespace: config.ns, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "webserver", + Image: netexecImageName, + ImagePullPolicy: api.PullIfNotPresent, + Command: []string{ + "/netexec", + fmt.Sprintf("--http-port=%d", endpointHttpPort), + fmt.Sprintf("--udp-port=%d", endpointUdpPort), + }, + Ports: []api.ContainerPort{ + { + Name: "http", + ContainerPort: endpointHttpPort, + }, + { + Name: "udp", + ContainerPort: endpointUdpPort, + Protocol: api.ProtocolUDP, + }, + }, + LivenessProbe: probe, + ReadinessProbe: probe, + }, + }, + NodeName: node, + }, + } + return pod +} + +func (config *NetworkingTestConfig) createTestPodSpec() *api.Pod { + pod := &api.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(), + }, + ObjectMeta: api.ObjectMeta{ + Name: testPodName, + Namespace: config.ns, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "webserver", + Image: netexecImageName, + ImagePullPolicy: api.PullIfNotPresent, + Command: []string{ + "/netexec", + fmt.Sprintf("--http-port=%d", endpointHttpPort), + fmt.Sprintf("--udp-port=%d", endpointUdpPort), + }, + Ports: []api.ContainerPort{ + { + Name: "http", + ContainerPort: testContainerHttpPort, + }, + }, + }, + }, + }, + } + return pod +} + +func (config *NetworkingTestConfig) createNodePortService(selector map[string]string) { + serviceSpec := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: nodePortServiceName, + }, + Spec: api.ServiceSpec{ + Type: api.ServiceTypeNodePort, + Ports: []api.ServicePort{ + {Port: clusterHttpPort, Name: "http", Protocol: api.ProtocolTCP, TargetPort: intstr.FromInt(endpointHttpPort)}, + {Port: clusterUdpPort, Name: "udp", Protocol: api.ProtocolUDP, TargetPort: intstr.FromInt(endpointUdpPort)}, + }, + Selector: selector, + }, + } + config.nodePortService = config.createService(serviceSpec) +} + +func (config *NetworkingTestConfig) deleteNodePortService() { + err := config.getServiceClient().Delete(config.nodePortService.Name) + Expect(err).NotTo(HaveOccurred(), "error while deleting NodePortService. err:%v)", err) + time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted. +} + +func (config *NetworkingTestConfig) createTestPods() { + testContainerPod := config.createTestPodSpec() + hostTestContainerPod := framework.NewHostExecPodSpec(config.ns, hostTestPodName) + + config.createPod(testContainerPod) + config.createPod(hostTestContainerPod) + + framework.ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name)) + framework.ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name)) + + var err error + config.testContainerPod, err = config.getPodClient().Get(testContainerPod.Name) + if err != nil { + framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err) + } + + config.hostTestContainerPod, err = config.getPodClient().Get(hostTestContainerPod.Name) + if err != nil { + framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err) + } +} + +func (config *NetworkingTestConfig) createService(serviceSpec *api.Service) *api.Service { + _, err := config.getServiceClient().Create(serviceSpec) + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err)) + + err = framework.WaitForService(config.f.Client, config.ns, serviceSpec.Name, true, 5*time.Second, 45*time.Second) + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err)) + + createdService, err := config.getServiceClient().Get(serviceSpec.Name) + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err)) + + return createdService +} + +func (config *NetworkingTestConfig) setup() { + By("creating a selector") + selectorName := "selector-" + string(uuid.NewUUID()) + serviceSelector := map[string]string{ + selectorName: "true", + } + + By("Getting node addresses") + framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client)) + nodeList := framework.GetReadySchedulableNodesOrDie(config.f.Client) + config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeExternalIP) + if len(config.externalAddrs) < 2 { + // fall back to legacy IPs + config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeLegacyHostIP) + } + Expect(len(config.externalAddrs)).To(BeNumerically(">=", 2), fmt.Sprintf("At least two nodes necessary with an external or LegacyHostIP")) + config.nodes = nodeList.Items + + By("Creating the service pods in kubernetes") + podName := "netserver" + config.endpointPods = config.createNetProxyPods(podName, serviceSelector) + + By("Creating the service on top of the pods in kubernetes") + config.createNodePortService(serviceSelector) + + By("Creating test pods") + config.createTestPods() + for _, p := range config.nodePortService.Spec.Ports { + switch p.Protocol { + case api.ProtocolUDP: + config.nodeUdpPort = int(p.NodePort) + case api.ProtocolTCP: + config.nodeHttpPort = int(p.NodePort) + default: + continue + } + } + + epCount := len(config.endpointPods) + config.maxTries = epCount*epCount + testTries + config.clusterIP = config.nodePortService.Spec.ClusterIP + config.nodeIP = config.externalAddrs[0] +} + +func (config *NetworkingTestConfig) cleanup() { + nsClient := config.getNamespacesClient() + nsList, err := nsClient.List(api.ListOptions{}) + if err == nil { + for _, ns := range nsList.Items { + if strings.Contains(ns.Name, config.f.BaseName) && ns.Name != config.ns { + nsClient.Delete(ns.Name) + } + } + } +} + +func (config *NetworkingTestConfig) createNetProxyPods(podName string, selector map[string]string) []*api.Pod { + framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client)) + nodes := framework.GetReadySchedulableNodesOrDie(config.f.Client) + + // create pods, one for each node + createdPods := make([]*api.Pod, 0, len(nodes.Items)) + for i, n := range nodes.Items { + podName := fmt.Sprintf("%s-%d", podName, i) + pod := config.createNetShellPodSpec(podName, n.Name) + pod.ObjectMeta.Labels = selector + createdPod := config.createPod(pod) + createdPods = append(createdPods, createdPod) + } + + // wait that all of them are up + runningPods := make([]*api.Pod, 0, len(nodes.Items)) + for _, p := range createdPods { + framework.ExpectNoError(config.f.WaitForPodReady(p.Name)) + rp, err := config.getPodClient().Get(p.Name) + framework.ExpectNoError(err) + runningPods = append(runningPods, rp) + } + + return runningPods +} + +func (config *NetworkingTestConfig) deleteNetProxyPod() { + pod := config.endpointPods[0] + config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0)) + config.endpointPods = config.endpointPods[1:] + // wait for pod being deleted. + err := framework.WaitForPodToDisappear(config.f.Client, config.ns, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout) + if err != nil { + framework.Failf("Failed to delete %s pod: %v", pod.Name, err) + } + // wait for endpoint being removed. + err = framework.WaitForServiceEndpointsNum(config.f.Client, config.ns, nodePortServiceName, len(config.endpointPods), time.Second, wait.ForeverTestTimeout) + if err != nil { + framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName) + } + // wait for kube-proxy to catch up with the pod being deleted. + time.Sleep(5 * time.Second) +} + +func (config *NetworkingTestConfig) createPod(pod *api.Pod) *api.Pod { + createdPod, err := config.getPodClient().Create(pod) + if err != nil { + framework.Failf("Failed to create %s pod: %v", pod.Name, err) + } + return createdPod +} + +func (config *NetworkingTestConfig) getPodClient() client.PodInterface { + return config.f.Client.Pods(config.ns) +} + +func (config *NetworkingTestConfig) getServiceClient() client.ServiceInterface { + return config.f.Client.Services(config.ns) +} + +func (config *NetworkingTestConfig) getNamespacesClient() client.NamespaceInterface { + return config.f.Client.Namespaces() +}