Networking test rewrite

This commit is contained in:
bprashanth 2016-08-24 18:08:12 -07:00
parent e976bc9ed2
commit 6b5d7e6d93
3 changed files with 652 additions and 811 deletions

View File

@ -1,577 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
api "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
)
const (
endpointHttpPort = 8080
endpointUdpPort = 8081
testContainerHttpPort = 8080
clusterHttpPort = 80
clusterUdpPort = 90
nodeHttpPort = 32080
nodeUdpPort = 32081
loadBalancerHttpPort = 100
netexecImageName = "gcr.io/google_containers/netexec:1.5"
testPodName = "test-container-pod"
hostTestPodName = "host-test-container-pod"
nodePortServiceName = "node-port-service"
loadBalancerServiceName = "load-balancer-service"
enableLoadBalancerTest = false
hitEndpointRetryDelay = 1 * time.Second
// Number of retries to hit a given set of endpoints. Needs to be high
// because we verify iptables statistical rr loadbalancing.
testTries = 30
)
type KubeProxyTestConfig struct {
testContainerPod *api.Pod
hostTestContainerPod *api.Pod
endpointPods []*api.Pod
f *framework.Framework
nodePortService *api.Service
loadBalancerService *api.Service
externalAddrs []string
nodes []api.Node
}
var _ = framework.KubeDescribe("KubeProxy", func() {
f := framework.NewDefaultFramework("e2e-kubeproxy")
config := &KubeProxyTestConfig{
f: f,
}
// Slow issue #14204 (10 min)
It("should test kube-proxy [Slow]", func() {
By("cleaning up any pre-existing namespaces used by this test")
config.cleanup()
By("Setting up for the tests")
config.setup()
//TODO Need to add hit externalIPs test
By("TODO: Need to add hit externalIPs test")
By("Hit Test with All Endpoints")
config.hitAll()
config.deleteNetProxyPod()
By("Hit Test with Fewer Endpoints")
config.hitAll()
By("Deleting nodePortservice and ensuring that service cannot be hit")
config.deleteNodePortService()
config.hitNodePort(0) // expect 0 endpoints to be hit
if enableLoadBalancerTest {
By("Deleting loadBalancerService and ensuring that service cannot be hit")
config.deleteLoadBalancerService()
config.hitLoadBalancer(0) // expect 0 endpoints to be hit
}
})
})
func (config *KubeProxyTestConfig) hitAll() {
By("Hitting endpoints from host and container")
config.hitEndpoints()
By("Hitting clusterIP from host and container")
config.hitClusterIP(len(config.endpointPods))
By("Hitting nodePort from host and container")
config.hitNodePort(len(config.endpointPods))
if enableLoadBalancerTest {
By("Waiting for LoadBalancer Ingress Setup")
config.waitForLoadBalancerIngressSetup()
By("Hitting LoadBalancer")
config.hitLoadBalancer(len(config.endpointPods))
}
}
func (config *KubeProxyTestConfig) hitLoadBalancer(epCount int) {
lbIP := config.loadBalancerService.Status.LoadBalancer.Ingress[0].IP
hostNames := make(map[string]bool)
tries := epCount*epCount + 5
for i := 0; i < tries; i++ {
transport := utilnet.SetTransportDefaults(&http.Transport{})
httpClient := createHTTPClient(transport)
resp, err := httpClient.Get(fmt.Sprintf("http://%s:%d/hostName", lbIP, loadBalancerHttpPort))
if err == nil {
defer resp.Body.Close()
hostName, err := ioutil.ReadAll(resp.Body)
if err == nil {
hostNames[string(hostName)] = true
}
}
transport.CloseIdleConnections()
}
Expect(len(hostNames)).To(BeNumerically("==", epCount), "LoadBalancer did not hit all pods")
}
func createHTTPClient(transport *http.Transport) *http.Client {
client := &http.Client{
Transport: transport,
Timeout: 5 * time.Second,
}
return client
}
func (config *KubeProxyTestConfig) hitClusterIP(epCount int) {
clusterIP := config.nodePortService.Spec.ClusterIP
tries := epCount*epCount + testTries // if epCount == 0
By("dialing(udp) node1 --> clusterIP:clusterUdpPort")
config.dialFromNode("udp", clusterIP, clusterUdpPort, tries, epCount)
By("dialing(http) node1 --> clusterIP:clusterHttpPort")
config.dialFromNode("http", clusterIP, clusterHttpPort, tries, epCount)
By("dialing(udp) test container --> clusterIP:clusterUdpPort")
config.dialFromTestContainer("udp", clusterIP, clusterUdpPort, tries, epCount)
By("dialing(http) test container --> clusterIP:clusterHttpPort")
config.dialFromTestContainer("http", clusterIP, clusterHttpPort, tries, epCount)
By("dialing(udp) endpoint container --> clusterIP:clusterUdpPort")
config.dialFromEndpointContainer("udp", clusterIP, clusterUdpPort, tries, epCount)
By("dialing(http) endpoint container --> clusterIP:clusterHttpPort")
config.dialFromEndpointContainer("http", clusterIP, clusterHttpPort, tries, epCount)
}
func (config *KubeProxyTestConfig) hitNodePort(epCount int) {
node1_IP := config.externalAddrs[0]
tries := epCount*epCount + testTries // if epCount == 0
By("dialing(udp) node1 --> node1:nodeUdpPort")
config.dialFromNode("udp", node1_IP, nodeUdpPort, tries, epCount)
By("dialing(http) node1 --> node1:nodeHttpPort")
config.dialFromNode("http", node1_IP, nodeHttpPort, tries, epCount)
By("dialing(udp) test container --> node1:nodeUdpPort")
config.dialFromTestContainer("udp", node1_IP, nodeUdpPort, tries, epCount)
By("dialing(http) test container --> node1:nodeHttpPort")
config.dialFromTestContainer("http", node1_IP, nodeHttpPort, tries, epCount)
By("dialing(udp) endpoint container --> node1:nodeUdpPort")
config.dialFromEndpointContainer("udp", node1_IP, nodeUdpPort, tries, epCount)
By("dialing(http) endpoint container --> node1:nodeHttpPort")
config.dialFromEndpointContainer("http", node1_IP, nodeHttpPort, tries, epCount)
By("dialing(udp) node --> 127.0.0.1:nodeUdpPort")
config.dialFromNode("udp", "127.0.0.1", nodeUdpPort, tries, epCount)
By("dialing(http) node --> 127.0.0.1:nodeHttpPort")
config.dialFromNode("http", "127.0.0.1", nodeHttpPort, tries, epCount)
node2_IP := config.externalAddrs[1]
By("dialing(udp) node1 --> node2:nodeUdpPort")
config.dialFromNode("udp", node2_IP, nodeUdpPort, tries, epCount)
By("dialing(http) node1 --> node2:nodeHttpPort")
config.dialFromNode("http", node2_IP, nodeHttpPort, tries, epCount)
By("checking kube-proxy URLs")
config.getSelfURL("/healthz", "ok")
config.getSelfURL("/proxyMode", "iptables") // the default
}
func (config *KubeProxyTestConfig) hitEndpoints() {
for _, endpointPod := range config.endpointPods {
Expect(len(endpointPod.Status.PodIP)).To(BeNumerically(">", 0), "podIP is empty:%s", endpointPod.Status.PodIP)
By("dialing(udp) endpointPodIP:endpointUdpPort from node1")
config.dialFromNode("udp", endpointPod.Status.PodIP, endpointUdpPort, 5, 1)
By("dialing(http) endpointPodIP:endpointHttpPort from node1")
config.dialFromNode("http", endpointPod.Status.PodIP, endpointHttpPort, 5, 1)
By("dialing(udp) endpointPodIP:endpointUdpPort from test container")
config.dialFromTestContainer("udp", endpointPod.Status.PodIP, endpointUdpPort, 5, 1)
By("dialing(http) endpointPodIP:endpointHttpPort from test container")
config.dialFromTestContainer("http", endpointPod.Status.PodIP, endpointHttpPort, 5, 1)
}
}
func (config *KubeProxyTestConfig) dialFromEndpointContainer(protocol, targetIP string, targetPort, tries, expectedCount int) {
config.dialFromContainer(protocol, config.endpointPods[0].Status.PodIP, targetIP, endpointHttpPort, targetPort, tries, expectedCount)
}
func (config *KubeProxyTestConfig) dialFromTestContainer(protocol, targetIP string, targetPort, tries, expectedCount int) {
config.dialFromContainer(protocol, config.testContainerPod.Status.PodIP, targetIP, testContainerHttpPort, targetPort, tries, expectedCount)
}
func (config *KubeProxyTestConfig) dialFromContainer(protocol, containerIP, targetIP string, containerHttpPort, targetPort, tries, expectedCount int) {
cmd := fmt.Sprintf("curl -q 'http://%s:%d/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=%d'",
containerIP,
containerHttpPort,
protocol,
targetIP,
targetPort,
tries)
By(fmt.Sprintf("Dialing from container. Running command:%s", cmd))
stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, cmd)
var output map[string][]string
err := json.Unmarshal([]byte(stdout), &output)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Could not unmarshal curl response: %s", stdout))
hostNamesMap := array2map(output["responses"])
Expect(len(hostNamesMap)).To(BeNumerically("==", expectedCount), fmt.Sprintf("Response was:%v", output))
}
func (config *KubeProxyTestConfig) dialFromNode(protocol, targetIP string, targetPort, tries, expectedCount int) {
var cmd string
if protocol == "udp" {
cmd = fmt.Sprintf("echo 'hostName' | timeout -t 3 nc -w 1 -u %s %d", targetIP, targetPort)
} else {
cmd = fmt.Sprintf("curl -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort)
}
// TODO: This simply tells us that we can reach the endpoints. Check that
// the probability of hitting a specific endpoint is roughly the same as
// hitting any other.
forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; sleep %v; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd, hitEndpointRetryDelay)
By(fmt.Sprintf("Dialing from node. command:%s", forLoop))
stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, forLoop)
Expect(strconv.Atoi(strings.TrimSpace(stdout))).To(BeNumerically("==", expectedCount))
}
func (config *KubeProxyTestConfig) getSelfURL(path string, expected string) {
cmd := fmt.Sprintf("curl -s --connect-timeout 1 http://localhost:10249%s", path)
By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, cmd)
Expect(strings.Contains(stdout, expected)).To(BeTrue())
}
func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod {
probe := &api.Probe{
InitialDelaySeconds: 10,
TimeoutSeconds: 30,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 3,
Handler: api.Handler{
HTTPGet: &api.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{IntVal: endpointHttpPort},
},
},
}
pod := &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(),
},
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: config.f.Namespace.Name,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "webserver",
Image: netexecImageName,
ImagePullPolicy: api.PullIfNotPresent,
Command: []string{
"/netexec",
fmt.Sprintf("--http-port=%d", endpointHttpPort),
fmt.Sprintf("--udp-port=%d", endpointUdpPort),
},
Ports: []api.ContainerPort{
{
Name: "http",
ContainerPort: endpointHttpPort,
},
{
Name: "udp",
ContainerPort: endpointUdpPort,
Protocol: api.ProtocolUDP,
},
},
LivenessProbe: probe,
ReadinessProbe: probe,
},
},
NodeName: node,
},
}
return pod
}
func (config *KubeProxyTestConfig) createTestPodSpec() *api.Pod {
pod := &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(),
},
ObjectMeta: api.ObjectMeta{
Name: testPodName,
Namespace: config.f.Namespace.Name,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "webserver",
Image: netexecImageName,
ImagePullPolicy: api.PullIfNotPresent,
Command: []string{
"/netexec",
fmt.Sprintf("--http-port=%d", endpointHttpPort),
fmt.Sprintf("--udp-port=%d", endpointUdpPort),
},
Ports: []api.ContainerPort{
{
Name: "http",
ContainerPort: testContainerHttpPort,
},
},
},
},
},
}
return pod
}
func (config *KubeProxyTestConfig) createNodePortService(selector map[string]string) {
serviceSpec := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: nodePortServiceName,
},
Spec: api.ServiceSpec{
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{Port: clusterHttpPort, Name: "http", Protocol: api.ProtocolTCP, NodePort: nodeHttpPort, TargetPort: intstr.FromInt(endpointHttpPort)},
{Port: clusterUdpPort, Name: "udp", Protocol: api.ProtocolUDP, NodePort: nodeUdpPort, TargetPort: intstr.FromInt(endpointUdpPort)},
},
Selector: selector,
},
}
config.nodePortService = config.createService(serviceSpec)
}
func (config *KubeProxyTestConfig) deleteNodePortService() {
err := config.getServiceClient().Delete(config.nodePortService.Name)
Expect(err).NotTo(HaveOccurred(), "error while deleting NodePortService. err:%v)", err)
time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
}
func (config *KubeProxyTestConfig) createLoadBalancerService(selector map[string]string) {
serviceSpec := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: loadBalancerServiceName,
},
Spec: api.ServiceSpec{
Type: api.ServiceTypeLoadBalancer,
Ports: []api.ServicePort{
{Port: loadBalancerHttpPort, Name: "http", Protocol: "TCP", TargetPort: intstr.FromInt(endpointHttpPort)},
},
Selector: selector,
},
}
config.createService(serviceSpec)
}
func (config *KubeProxyTestConfig) deleteLoadBalancerService() {
go func() { config.getServiceClient().Delete(config.loadBalancerService.Name) }()
time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
}
func (config *KubeProxyTestConfig) waitForLoadBalancerIngressSetup() {
err := wait.Poll(2*time.Second, 120*time.Second, func() (bool, error) {
service, err := config.getServiceClient().Get(loadBalancerServiceName)
if err != nil {
return false, err
} else {
if len(service.Status.LoadBalancer.Ingress) > 0 {
return true, nil
} else {
return false, fmt.Errorf("Service LoadBalancer Ingress was not setup.")
}
}
})
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to setup Load Balancer Service. err:%v", err))
config.loadBalancerService, _ = config.getServiceClient().Get(loadBalancerServiceName)
}
func (config *KubeProxyTestConfig) createTestPods() {
testContainerPod := config.createTestPodSpec()
hostTestContainerPod := framework.NewHostExecPodSpec(config.f.Namespace.Name, hostTestPodName)
config.createPod(testContainerPod)
config.createPod(hostTestContainerPod)
framework.ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name))
framework.ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name))
var err error
config.testContainerPod, err = config.getPodClient().Get(testContainerPod.Name)
if err != nil {
framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
}
config.hostTestContainerPod, err = config.getPodClient().Get(hostTestContainerPod.Name)
if err != nil {
framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
}
}
func (config *KubeProxyTestConfig) createService(serviceSpec *api.Service) *api.Service {
_, err := config.getServiceClient().Create(serviceSpec)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
err = framework.WaitForService(config.f.Client, config.f.Namespace.Name, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))
createdService, err := config.getServiceClient().Get(serviceSpec.Name)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
return createdService
}
func (config *KubeProxyTestConfig) setup() {
By("creating a selector")
selectorName := "selector-" + string(uuid.NewUUID())
serviceSelector := map[string]string{
selectorName: "true",
}
By("Getting node addresses")
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client))
nodeList := framework.GetReadySchedulableNodesOrDie(config.f.Client)
config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeExternalIP)
if len(config.externalAddrs) < 2 {
// fall back to legacy IPs
config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeLegacyHostIP)
}
Expect(len(config.externalAddrs)).To(BeNumerically(">=", 2), fmt.Sprintf("At least two nodes necessary with an external or LegacyHostIP"))
config.nodes = nodeList.Items
if enableLoadBalancerTest {
By("Creating the LoadBalancer Service on top of the pods in kubernetes")
config.createLoadBalancerService(serviceSelector)
}
By("Creating the service pods in kubernetes")
podName := "netserver"
config.endpointPods = config.createNetProxyPods(podName, serviceSelector)
By("Creating the service on top of the pods in kubernetes")
config.createNodePortService(serviceSelector)
By("Creating test pods")
config.createTestPods()
}
func (config *KubeProxyTestConfig) cleanup() {
nsClient := config.getNamespacesClient()
nsList, err := nsClient.List(api.ListOptions{})
if err == nil {
for _, ns := range nsList.Items {
if strings.Contains(ns.Name, config.f.BaseName) && ns.Name != config.f.Namespace.Name {
nsClient.Delete(ns.Name)
}
}
}
}
func (config *KubeProxyTestConfig) createNetProxyPods(podName string, selector map[string]string) []*api.Pod {
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client))
nodes := framework.GetReadySchedulableNodesOrDie(config.f.Client)
// create pods, one for each node
createdPods := make([]*api.Pod, 0, len(nodes.Items))
for i, n := range nodes.Items {
podName := fmt.Sprintf("%s-%d", podName, i)
pod := config.createNetShellPodSpec(podName, n.Name)
pod.ObjectMeta.Labels = selector
createdPod := config.createPod(pod)
createdPods = append(createdPods, createdPod)
}
// wait that all of them are up
runningPods := make([]*api.Pod, 0, len(nodes.Items))
for _, p := range createdPods {
framework.ExpectNoError(config.f.WaitForPodReady(p.Name))
rp, err := config.getPodClient().Get(p.Name)
framework.ExpectNoError(err)
runningPods = append(runningPods, rp)
}
return runningPods
}
func (config *KubeProxyTestConfig) deleteNetProxyPod() {
pod := config.endpointPods[0]
config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0))
config.endpointPods = config.endpointPods[1:]
// wait for pod being deleted.
err := framework.WaitForPodToDisappear(config.f.Client, config.f.Namespace.Name, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout)
if err != nil {
framework.Failf("Failed to delete %s pod: %v", pod.Name, err)
}
// wait for endpoint being removed.
err = framework.WaitForServiceEndpointsNum(config.f.Client, config.f.Namespace.Name, nodePortServiceName, len(config.endpointPods), time.Second, wait.ForeverTestTimeout)
if err != nil {
framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
}
// wait for kube-proxy to catch up with the pod being deleted.
time.Sleep(5 * time.Second)
}
func (config *KubeProxyTestConfig) createPod(pod *api.Pod) *api.Pod {
createdPod, err := config.getPodClient().Create(pod)
if err != nil {
framework.Failf("Failed to create %s pod: %v", pod.Name, err)
}
return createdPod
}
func (config *KubeProxyTestConfig) getPodClient() client.PodInterface {
return config.f.Client.Pods(config.f.Namespace.Name)
}
func (config *KubeProxyTestConfig) getServiceClient() client.ServiceInterface {
return config.f.Client.Services(config.f.Namespace.Name)
}
func (config *KubeProxyTestConfig) getNamespacesClient() client.NamespaceInterface {
return config.f.Client.Namespaces()
}
func array2map(arr []string) map[string]bool {
retval := make(map[string]bool)
if len(arr) == 0 {
return retval
}
for _, str := range arr {
retval[str] = true
}
return retval
}

View File

@ -19,26 +19,21 @@ package e2e
import (
"fmt"
"net/http"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = framework.KubeDescribe("Networking", func() {
f := framework.NewDefaultFramework("nettest")
var svcname = "nettest"
f := framework.NewDefaultFramework(svcname)
BeforeEach(func() {
//Assert basic external connectivity.
//Since this is not really a test of kubernetes in any way, we
//leave it as a pre-test assertion, rather than a Ginko test.
// Assert basic external connectivity.
// Since this is not really a test of kubernetes in any way, we
// leave it as a pre-test assertion, rather than a Ginko test.
By("Executing a successful http request from the external internet")
resp, err := http.Get("http://google.com")
if err != nil {
@ -79,235 +74,152 @@ var _ = framework.KubeDescribe("Networking", func() {
}
})
//Now we can proceed with the test.
It("should function for intra-pod communication [Conformance]", func() {
It("should check kube-proxy urls", func() {
// TODO: this is overkill we just need the host networking pod
// to hit kube-proxy urls.
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("Creating a service named %q in namespace %q", svcname, f.Namespace.Name))
svc, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: svcname,
Labels: map[string]string{
"name": svcname,
},
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Protocol: "TCP",
Port: 8080,
TargetPort: intstr.FromInt(8080),
}},
Selector: map[string]string{
"name": svcname,
},
},
})
if err != nil {
framework.Failf("unable to create test service named [%s] %v", svc.Name, err)
}
// Clean up service
defer func() {
By("Cleaning up the service")
if err = f.Client.Services(f.Namespace.Name).Delete(svc.Name); err != nil {
framework.Failf("unable to delete svc %v: %v", svc.Name, err)
}
}()
By("Creating a webserver (pending) pod on each node")
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(f.Client))
nodes := framework.GetReadySchedulableNodesOrDie(f.Client)
// This test is super expensive in terms of network usage - large services
// result in huge "Endpoint" objects and all underlying pods read them
// periodically. Moreover, all KubeProxies watch all of them.
// Thus we limit the maximum number of pods under a service.
//
// TODO: Remove this limitation once services, endpoints and data flows
// between nodes and master are better optimized.
maxNodeCount := 250
if len(nodes.Items) > maxNodeCount {
nodes.Items = nodes.Items[:maxNodeCount]
}
if len(nodes.Items) == 1 {
// in general, the test requires two nodes. But for local development, often a one node cluster
// is created, for simplicity and speed. (see issue #10012). We permit one-node test
// only in some cases
if !framework.ProviderIs("local") {
framework.Failf(fmt.Sprintf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider))
}
framework.Logf("Only one ready node is detected. The test has limited scope in such setting. " +
"Rerun it with at least two nodes to get complete coverage.")
}
podNames := LaunchNetTestPodPerNode(f, nodes, svcname)
// Clean up the pods
defer func() {
By("Cleaning up the webserver pods")
for _, podName := range podNames {
if err = f.Client.Pods(f.Namespace.Name).Delete(podName, nil); err != nil {
framework.Logf("Failed to delete pod %s: %v", podName, err)
}
}
}()
By("Waiting for the webserver pods to transition to Running state")
for _, podName := range podNames {
err = f.WaitForPodRunning(podName)
Expect(err).NotTo(HaveOccurred())
}
By("Waiting for connectivity to be verified")
passed := false
//once response OK, evaluate response body for pass/fail.
var body []byte
getDetails := func() ([]byte, error) {
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get())
if errProxy != nil {
return nil, errProxy
}
return proxyRequest.Namespace(f.Namespace.Name).
Name(svc.Name).
Suffix("read").
DoRaw()
}
getStatus := func() ([]byte, error) {
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get())
if errProxy != nil {
return nil, errProxy
}
return proxyRequest.Namespace(f.Namespace.Name).
Name(svc.Name).
Suffix("status").
DoRaw()
}
// nettest containers will wait for all service endpoints to come up for 2 minutes
// apply a 3 minutes observation period here to avoid this test to time out before the nettest starts to contact peers
timeout := time.Now().Add(3 * time.Minute)
for i := 0; !passed && timeout.After(time.Now()); i++ {
time.Sleep(2 * time.Second)
framework.Logf("About to make a proxy status call")
start := time.Now()
body, err = getStatus()
framework.Logf("Proxy status call returned in %v", time.Since(start))
if err != nil {
framework.Logf("Attempt %v: service/pod still starting. (error: '%v')", i, err)
continue
}
// Finally, we pass/fail the test based on if the container's response body, as to whether or not it was able to find peers.
switch {
case string(body) == "pass":
framework.Logf("Passed on attempt %v. Cleaning up.", i)
passed = true
case string(body) == "running":
framework.Logf("Attempt %v: test still running", i)
case string(body) == "fail":
if body, err = getDetails(); err != nil {
framework.Failf("Failed on attempt %v. Cleaning up. Error reading details: %v", i, err)
} else {
framework.Failf("Failed on attempt %v. Cleaning up. Details:\n%s", i, string(body))
}
case strings.Contains(string(body), "no endpoints available"):
framework.Logf("Attempt %v: waiting on service/endpoints", i)
default:
framework.Logf("Unexpected response:\n%s", body)
}
}
if !passed {
if body, err = getDetails(); err != nil {
framework.Failf("Timed out. Cleaning up. Error reading details: %v", err)
} else {
framework.Failf("Timed out. Cleaning up. Details:\n%s", string(body))
}
}
Expect(string(body)).To(Equal("pass"))
By("checking kube-proxy URLs")
config.getSelfURL("/healthz", "ok")
config.getSelfURL("/proxyMode", "iptables") // the default
})
framework.KubeDescribe("Granular Checks", func() {
framework.KubeDescribe("Granular Checks: Pods", func() {
connectivityTimeout := 10
It("should function for pod communication on a single node", func() {
By("Picking a node")
nodes := framework.GetReadySchedulableNodesOrDie(f.Client)
node := nodes.Items[0]
By("Creating a webserver pod")
podName := "same-node-webserver"
defer f.Client.Pods(f.Namespace.Name).Delete(podName, nil)
ip := framework.LaunchWebserverPod(f, podName, node.Name)
By("Checking that the webserver is accessible from a pod on the same node")
framework.ExpectNoError(framework.CheckConnectivityToHost(f, node.Name, "same-node-wget", ip, connectivityTimeout))
})
It("should function for pod communication between nodes", func() {
podClient := f.Client.Pods(f.Namespace.Name)
By("Picking multiple nodes")
nodes := framework.GetReadySchedulableNodesOrDie(f.Client)
if len(nodes.Items) == 1 {
framework.Skipf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider)
// Try to hit all endpoints through a test container, retry 5 times,
// expect exactly one unique hostname. Each of these endpoints reports
// its own hostname.
It("should function for intra-pod communication: http [Conformance]", func() {
config := NewNetworkingTestConfig(f)
for _, endpointPod := range config.endpointPods {
config.dialFromTestContainer("http", endpointPod.Status.PodIP, endpointHttpPort, config.maxTries, 0, sets.NewString(endpointPod.Name))
}
node1 := nodes.Items[0]
node2 := nodes.Items[1]
By("Creating a webserver pod")
podName := "different-node-webserver"
defer podClient.Delete(podName, nil)
ip := framework.LaunchWebserverPod(f, podName, node1.Name)
By("Checking that the webserver is accessible from a pod on a different node")
framework.ExpectNoError(framework.CheckConnectivityToHost(f, node2.Name, "different-node-wget", ip, connectivityTimeout))
})
It("should function for intra-pod communication: udp [Conformance]", func() {
config := NewNetworkingTestConfig(f)
for _, endpointPod := range config.endpointPods {
config.dialFromTestContainer("udp", endpointPod.Status.PodIP, endpointUdpPort, config.maxTries, 0, sets.NewString(endpointPod.Name))
}
})
It("should function for node-pod communication: http [Conformance]", func() {
config := NewNetworkingTestConfig(f)
for _, endpointPod := range config.endpointPods {
config.dialFromNode("http", endpointPod.Status.PodIP, endpointHttpPort, config.maxTries, 0, sets.NewString(endpointPod.Name))
}
})
It("should function for node-pod communication: udp [Conformance]", func() {
config := NewNetworkingTestConfig(f)
for _, endpointPod := range config.endpointPods {
config.dialFromNode("udp", endpointPod.Status.PodIP, endpointUdpPort, config.maxTries, 0, sets.NewString(endpointPod.Name))
}
})
})
// TODO: Remove [Slow] when this has had enough bake time to prove presubmit worthiness.
framework.KubeDescribe("Granular Checks: Services [Slow]", func() {
It("should function for pod-Service: http", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(http) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterHttpPort))
config.dialFromTestContainer("http", config.clusterIP, clusterHttpPort, config.maxTries, 0, config.endpointHostnames())
By(fmt.Sprintf("dialing(http) %v --> %v:%v (nodeIP)", config.testContainerPod.Name, config.externalAddrs[0], config.nodeHttpPort))
config.dialFromTestContainer("http", config.nodeIP, config.nodeHttpPort, config.maxTries, 0, config.endpointHostnames())
})
It("should function for pod-Service: udp", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(udp) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterUdpPort))
config.dialFromTestContainer("udp", config.clusterIP, clusterUdpPort, config.maxTries, 0, config.endpointHostnames())
By(fmt.Sprintf("dialing(udp) %v --> %v:%v (nodeIP)", config.testContainerPod.Name, config.externalAddrs[0], config.nodeUdpPort))
config.dialFromTestContainer("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, 0, config.endpointHostnames())
})
It("should function for node-Service: http", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(http) %v (node) --> %v:%v (config.clusterIP)", config.nodeIP, config.clusterIP, clusterHttpPort))
config.dialFromNode("http", config.clusterIP, clusterHttpPort, config.maxTries, 0, config.endpointHostnames())
By(fmt.Sprintf("dialing(http) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeHttpPort))
config.dialFromNode("http", config.nodeIP, config.nodeHttpPort, config.maxTries, 0, config.endpointHostnames())
})
It("should function for node-Service: udp", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(udp) %v (node) --> %v:%v (config.clusterIP)", config.nodeIP, config.clusterIP, clusterUdpPort))
config.dialFromNode("udp", config.clusterIP, clusterUdpPort, config.maxTries, 0, config.endpointHostnames())
By(fmt.Sprintf("dialing(udp) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeUdpPort))
config.dialFromNode("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, 0, config.endpointHostnames())
})
It("should function for endpoint-Service: http", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(http) %v (endpoint) --> %v:%v (config.clusterIP)", config.endpointPods[0].Name, config.clusterIP, clusterHttpPort))
config.dialFromEndpointContainer("http", config.clusterIP, clusterHttpPort, config.maxTries, 0, config.endpointHostnames())
By(fmt.Sprintf("dialing(http) %v (endpoint) --> %v:%v (nodeIP)", config.endpointPods[0].Name, config.nodeIP, config.nodeHttpPort))
config.dialFromEndpointContainer("http", config.nodeIP, config.nodeHttpPort, config.maxTries, 0, config.endpointHostnames())
})
It("should function for endpoint-Service: udp", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(udp) %v (endpoint) --> %v:%v (config.clusterIP)", config.endpointPods[0].Name, config.clusterIP, clusterUdpPort))
config.dialFromEndpointContainer("udp", config.clusterIP, clusterUdpPort, config.maxTries, 0, config.endpointHostnames())
By(fmt.Sprintf("dialing(udp) %v (endpoint) --> %v:%v (nodeIP)", config.endpointPods[0].Name, config.nodeIP, config.nodeUdpPort))
config.dialFromEndpointContainer("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, 0, config.endpointHostnames())
})
It("should update endpoints: http", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(http) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterHttpPort))
config.dialFromTestContainer("http", config.clusterIP, clusterHttpPort, config.maxTries, 0, config.endpointHostnames())
config.deleteNetProxyPod()
By(fmt.Sprintf("dialing(http) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterHttpPort))
config.dialFromTestContainer("http", config.clusterIP, clusterHttpPort, config.maxTries, config.maxTries, config.endpointHostnames())
})
It("should update endpoints: udp", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(udp) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterUdpPort))
config.dialFromTestContainer("udp", config.clusterIP, clusterUdpPort, config.maxTries, 0, config.endpointHostnames())
config.deleteNetProxyPod()
By(fmt.Sprintf("dialing(udp) %v --> %v:%v (config.clusterIP)", config.testContainerPod.Name, config.clusterIP, clusterUdpPort))
config.dialFromTestContainer("udp", config.clusterIP, clusterUdpPort, config.maxTries, config.maxTries, config.endpointHostnames())
})
// Slow because we confirm that the nodePort doesn't serve traffic, which requires a period of polling.
It("should update nodePort: http [Slow]", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(http) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeHttpPort))
config.dialFromNode("http", config.nodeIP, config.nodeHttpPort, config.maxTries, 0, config.endpointHostnames())
config.deleteNodePortService()
By(fmt.Sprintf("dialing(http) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeHttpPort))
config.dialFromNode("http", config.nodeIP, config.nodeHttpPort, config.maxTries, config.maxTries, sets.NewString())
})
// Slow because we confirm that the nodePort doesn't serve traffic, which requires a period of polling.
It("should update nodePort: udp [Slow]", func() {
config := NewNetworkingTestConfig(f)
By(fmt.Sprintf("dialing(udp) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeUdpPort))
config.dialFromNode("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, 0, config.endpointHostnames())
config.deleteNodePortService()
By(fmt.Sprintf("dialing(udp) %v (node) --> %v:%v (nodeIP)", config.nodeIP, config.nodeIP, config.nodeUdpPort))
config.dialFromNode("udp", config.nodeIP, config.nodeUdpPort, config.maxTries, config.maxTries, sets.NewString())
})
// TODO: Test sessionAffinity #31712
})
})
func LaunchNetTestPodPerNode(f *framework.Framework, nodes *api.NodeList, name string) []string {
podNames := []string{}
totalPods := len(nodes.Items)
Expect(totalPods).NotTo(Equal(0))
for _, node := range nodes.Items {
pod, err := f.Client.Pods(f.Namespace.Name).Create(&api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: name + "-",
Labels: map[string]string{
"name": name,
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "webserver",
Image: "gcr.io/google_containers/nettest:1.9",
Args: []string{
"-service=" + name,
//peers >= totalPods should be asserted by the container.
//the nettest container finds peers by looking up list of svc endpoints.
fmt.Sprintf("-peers=%d", totalPods),
"-namespace=" + f.Namespace.Name},
Ports: []api.ContainerPort{{ContainerPort: 8080}},
},
},
NodeName: node.Name,
RestartPolicy: api.RestartPolicyNever,
},
})
Expect(err).NotTo(HaveOccurred())
framework.Logf("Created pod %s on node %s", pod.ObjectMeta.Name, node.Name)
podNames = append(podNames, pod.ObjectMeta.Name)
}
return podNames
}

View File

@ -0,0 +1,506 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"encoding/json"
"fmt"
"strings"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
api "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
)
const (
endpointHttpPort = 8080
endpointUdpPort = 8081
testContainerHttpPort = 8080
clusterHttpPort = 80
clusterUdpPort = 90
netexecImageName = "gcr.io/google_containers/netexec:1.5"
hostexecImageName = "gcr.io/google_containers/hostexec:1.2"
testPodName = "test-container-pod"
hostTestPodName = "host-test-container-pod"
nodePortServiceName = "node-port-service"
hitEndpointRetryDelay = 1 * time.Second
// Number of retries to hit a given set of endpoints. Needs to be high
// because we verify iptables statistical rr loadbalancing.
testTries = 30
)
// NewNetworkingTestConfig creates and sets up a new test config helper.
func NewNetworkingTestConfig(f *framework.Framework) *NetworkingTestConfig {
config := &NetworkingTestConfig{f: f, ns: f.Namespace.Name}
By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.ns))
config.setup()
return config
}
// NetworkingTestConfig is a convenience class around some utility methods
// for testing kubeproxy/networking/services/endpoints.
type NetworkingTestConfig struct {
// testContaienrPod is a test pod running the netexec image. It is capable
// of executing tcp/udp requests against ip:port.
testContainerPod *api.Pod
// hostTestContainerPod is a pod running with hostNetworking=true, and the
// hostexec image.
hostTestContainerPod *api.Pod
// endpointPods are the pods belonging to the Service created by this
// test config. Each invocation of `setup` creates a service with
// 1 pod per node running the netexecImage.
endpointPods []*api.Pod
f *framework.Framework
// nodePortService is a Service with Type=NodePort spanning over all
// endpointPods.
nodePortService *api.Service
// externalAddrs is a list of external IPs of nodes in the cluster.
externalAddrs []string
// nodes is a list of nodes in the cluster.
nodes []api.Node
// maxTries is the number of retries tolerated for tests run against
// endpoints and services created by this config.
maxTries int
// The clusterIP of the Service reated by this test config.
clusterIP string
// External ip of first node for use in nodePort testing.
nodeIP string
// The http/udp nodePorts of the Service.
nodeHttpPort int
nodeUdpPort int
// The kubernetes namespace within which all resources for this
// config are created
ns string
}
func (config *NetworkingTestConfig) dialFromEndpointContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
config.dialFromContainer(protocol, config.endpointPods[0].Status.PodIP, targetIP, endpointHttpPort, targetPort, maxTries, minTries, expectedEps)
}
func (config *NetworkingTestConfig) dialFromTestContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
config.dialFromContainer(protocol, config.testContainerPod.Status.PodIP, targetIP, testContainerHttpPort, targetPort, maxTries, minTries, expectedEps)
}
// diagnoseMissingEndpoints prints debug information about the endpoints that
// are NOT in the given list of foundEndpoints. These are the endpoints we
// expected a response from.
func (config *NetworkingTestConfig) diagnoseMissingEndpoints(foundEndpoints sets.String) {
for _, e := range config.endpointPods {
if foundEndpoints.Has(e.Name) {
continue
}
framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name)
desc, _ := framework.RunKubectl(
"describe", "pod", e.Name, fmt.Sprintf("--namespace=%v", e.Namespace))
framework.Logf(desc)
}
}
// endpointHostnames returns a set of hostnames for existing endpoints.
func (config *NetworkingTestConfig) endpointHostnames() sets.String {
expectedEps := sets.NewString()
for _, p := range config.endpointPods {
expectedEps.Insert(p.Name)
}
return expectedEps
}
// dialFromContainers executes a curl via kubectl exec in a test container,
// which might then translate to a tcp or udp request based on the protocol
// argument in the url.
// - minTries is the minimum number of curl attempts required before declaring
// success. Set to 0 if you'd like to return as soon as all endpoints respond
// at least once.
// - maxTries is the maximum number of curl attempts. If this many attempts pass
// and we don't see all expected endpoints, the test fails.
// - expectedEps is the set of endpointnames to wait for. Typically this is also
// the hostname reported by each pod in the service through /hostName.
// maxTries == minTries will confirm that we see the expected endpoints and no
// more for maxTries. Use this if you want to eg: fail a readiness check on a
// pod and confirm it doesn't show up as an endpoint.
func (config *NetworkingTestConfig) dialFromContainer(protocol, containerIP, targetIP string, containerHttpPort, targetPort, maxTries, minTries int, expectedEps sets.String) {
cmd := fmt.Sprintf("curl -q -s 'http://%s:%d/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=1'",
containerIP,
containerHttpPort,
protocol,
targetIP,
targetPort)
eps := sets.NewString()
for i := 0; i < maxTries; i++ {
stdout, err := framework.RunHostCmd(config.ns, config.hostTestContainerPod.Name, cmd)
if err != nil {
// A failure to kubectl exec counts as a try, not a hard fail.
// Also note that we will keep failing for maxTries in tests where
// we confirm unreachability.
framework.Logf("Failed to execute %v: %v", cmd, err)
} else {
var output map[string][]string
if err := json.Unmarshal([]byte(stdout), &output); err != nil {
framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
cmd, config.hostTestContainerPod.Name, stdout, err)
continue
}
for _, hostName := range output["responses"] {
eps.Insert(hostName)
}
}
framework.Logf("Waiting for endpoints: %v", expectedEps.Difference(eps))
// Check against i+1 so we exit if minTries == maxTries.
if (eps.Equal(expectedEps) || eps.Len() == 0 && expectedEps.Len() == 0) && i+1 >= minTries {
return
}
}
config.diagnoseMissingEndpoints(eps)
framework.Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", minTries, cmd, eps, expectedEps)
}
// dialFromNode executes a tcp or udp request based on protocol via kubectl exec
// in a test container running with host networking.
// - minTries is the minimum number of curl attempts required before declaring
// success. Set to 0 if you'd like to return as soon as all endpoints respond
// at least once.
// - maxTries is the maximum number of curl attempts. If this many attempts pass
// and we don't see all expected endpoints, the test fails.
// maxTries == minTries will confirm that we see the expected endpoints and no
// more for maxTries. Use this if you want to eg: fail a readiness check on a
// pod and confirm it doesn't show up as an endpoint.
func (config *NetworkingTestConfig) dialFromNode(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
var cmd string
if protocol == "udp" {
cmd = fmt.Sprintf("echo 'hostName' | timeout -t 3 nc -w 1 -u %s %d", targetIP, targetPort)
} else {
cmd = fmt.Sprintf("curl -q -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort)
}
// TODO: This simply tells us that we can reach the endpoints. Check that
// the probability of hitting a specific endpoint is roughly the same as
// hitting any other.
eps := sets.NewString()
filterCmd := fmt.Sprintf("%s | grep -v '^\\s*$'", cmd)
for i := 0; i < maxTries; i++ {
stdout, err := framework.RunHostCmd(config.ns, config.hostTestContainerPod.Name, filterCmd)
if err != nil {
// A failure to kubectl exec counts as a try, not a hard fail.
// Also note that we will keep failing for maxTries in tests where
// we confirm unreachability.
framework.Logf("Failed to execute %v: %v", filterCmd, err)
} else {
eps.Insert(strings.TrimSpace(stdout))
}
framework.Logf("Waiting for %+v endpoints, got endpoints %+v", expectedEps.Difference(eps), eps)
// Check against i+1 so we exit if minTries == maxTries.
if (eps.Equal(expectedEps) || eps.Len() == 0 && expectedEps.Len() == 0) && i+1 >= minTries {
return
}
}
config.diagnoseMissingEndpoints(eps)
framework.Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", minTries, cmd, eps, expectedEps)
}
// getSelfURL executes a curl against the given path via kubectl exec into a
// test container running with host networking, and fails if the output
// doesn't match the expected string.
func (config *NetworkingTestConfig) getSelfURL(path string, expected string) {
cmd := fmt.Sprintf("curl -q -s --connect-timeout 1 http://localhost:10249%s", path)
By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
stdout := framework.RunHostCmdOrDie(config.ns, config.hostTestContainerPod.Name, cmd)
Expect(strings.Contains(stdout, expected)).To(BeTrue())
}
func (config *NetworkingTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod {
probe := &api.Probe{
InitialDelaySeconds: 10,
TimeoutSeconds: 30,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 3,
Handler: api.Handler{
HTTPGet: &api.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{IntVal: endpointHttpPort},
},
},
}
pod := &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(),
},
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: config.ns,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "webserver",
Image: netexecImageName,
ImagePullPolicy: api.PullIfNotPresent,
Command: []string{
"/netexec",
fmt.Sprintf("--http-port=%d", endpointHttpPort),
fmt.Sprintf("--udp-port=%d", endpointUdpPort),
},
Ports: []api.ContainerPort{
{
Name: "http",
ContainerPort: endpointHttpPort,
},
{
Name: "udp",
ContainerPort: endpointUdpPort,
Protocol: api.ProtocolUDP,
},
},
LivenessProbe: probe,
ReadinessProbe: probe,
},
},
NodeName: node,
},
}
return pod
}
func (config *NetworkingTestConfig) createTestPodSpec() *api.Pod {
pod := &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(),
},
ObjectMeta: api.ObjectMeta{
Name: testPodName,
Namespace: config.ns,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "webserver",
Image: netexecImageName,
ImagePullPolicy: api.PullIfNotPresent,
Command: []string{
"/netexec",
fmt.Sprintf("--http-port=%d", endpointHttpPort),
fmt.Sprintf("--udp-port=%d", endpointUdpPort),
},
Ports: []api.ContainerPort{
{
Name: "http",
ContainerPort: testContainerHttpPort,
},
},
},
},
},
}
return pod
}
func (config *NetworkingTestConfig) createNodePortService(selector map[string]string) {
serviceSpec := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: nodePortServiceName,
},
Spec: api.ServiceSpec{
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{Port: clusterHttpPort, Name: "http", Protocol: api.ProtocolTCP, TargetPort: intstr.FromInt(endpointHttpPort)},
{Port: clusterUdpPort, Name: "udp", Protocol: api.ProtocolUDP, TargetPort: intstr.FromInt(endpointUdpPort)},
},
Selector: selector,
},
}
config.nodePortService = config.createService(serviceSpec)
}
func (config *NetworkingTestConfig) deleteNodePortService() {
err := config.getServiceClient().Delete(config.nodePortService.Name)
Expect(err).NotTo(HaveOccurred(), "error while deleting NodePortService. err:%v)", err)
time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
}
func (config *NetworkingTestConfig) createTestPods() {
testContainerPod := config.createTestPodSpec()
hostTestContainerPod := framework.NewHostExecPodSpec(config.ns, hostTestPodName)
config.createPod(testContainerPod)
config.createPod(hostTestContainerPod)
framework.ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name))
framework.ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name))
var err error
config.testContainerPod, err = config.getPodClient().Get(testContainerPod.Name)
if err != nil {
framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
}
config.hostTestContainerPod, err = config.getPodClient().Get(hostTestContainerPod.Name)
if err != nil {
framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
}
}
func (config *NetworkingTestConfig) createService(serviceSpec *api.Service) *api.Service {
_, err := config.getServiceClient().Create(serviceSpec)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
err = framework.WaitForService(config.f.Client, config.ns, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))
createdService, err := config.getServiceClient().Get(serviceSpec.Name)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
return createdService
}
func (config *NetworkingTestConfig) setup() {
By("creating a selector")
selectorName := "selector-" + string(uuid.NewUUID())
serviceSelector := map[string]string{
selectorName: "true",
}
By("Getting node addresses")
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client))
nodeList := framework.GetReadySchedulableNodesOrDie(config.f.Client)
config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeExternalIP)
if len(config.externalAddrs) < 2 {
// fall back to legacy IPs
config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeLegacyHostIP)
}
Expect(len(config.externalAddrs)).To(BeNumerically(">=", 2), fmt.Sprintf("At least two nodes necessary with an external or LegacyHostIP"))
config.nodes = nodeList.Items
By("Creating the service pods in kubernetes")
podName := "netserver"
config.endpointPods = config.createNetProxyPods(podName, serviceSelector)
By("Creating the service on top of the pods in kubernetes")
config.createNodePortService(serviceSelector)
By("Creating test pods")
config.createTestPods()
for _, p := range config.nodePortService.Spec.Ports {
switch p.Protocol {
case api.ProtocolUDP:
config.nodeUdpPort = int(p.NodePort)
case api.ProtocolTCP:
config.nodeHttpPort = int(p.NodePort)
default:
continue
}
}
epCount := len(config.endpointPods)
config.maxTries = epCount*epCount + testTries
config.clusterIP = config.nodePortService.Spec.ClusterIP
config.nodeIP = config.externalAddrs[0]
}
func (config *NetworkingTestConfig) cleanup() {
nsClient := config.getNamespacesClient()
nsList, err := nsClient.List(api.ListOptions{})
if err == nil {
for _, ns := range nsList.Items {
if strings.Contains(ns.Name, config.f.BaseName) && ns.Name != config.ns {
nsClient.Delete(ns.Name)
}
}
}
}
func (config *NetworkingTestConfig) createNetProxyPods(podName string, selector map[string]string) []*api.Pod {
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client))
nodes := framework.GetReadySchedulableNodesOrDie(config.f.Client)
// create pods, one for each node
createdPods := make([]*api.Pod, 0, len(nodes.Items))
for i, n := range nodes.Items {
podName := fmt.Sprintf("%s-%d", podName, i)
pod := config.createNetShellPodSpec(podName, n.Name)
pod.ObjectMeta.Labels = selector
createdPod := config.createPod(pod)
createdPods = append(createdPods, createdPod)
}
// wait that all of them are up
runningPods := make([]*api.Pod, 0, len(nodes.Items))
for _, p := range createdPods {
framework.ExpectNoError(config.f.WaitForPodReady(p.Name))
rp, err := config.getPodClient().Get(p.Name)
framework.ExpectNoError(err)
runningPods = append(runningPods, rp)
}
return runningPods
}
func (config *NetworkingTestConfig) deleteNetProxyPod() {
pod := config.endpointPods[0]
config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0))
config.endpointPods = config.endpointPods[1:]
// wait for pod being deleted.
err := framework.WaitForPodToDisappear(config.f.Client, config.ns, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout)
if err != nil {
framework.Failf("Failed to delete %s pod: %v", pod.Name, err)
}
// wait for endpoint being removed.
err = framework.WaitForServiceEndpointsNum(config.f.Client, config.ns, nodePortServiceName, len(config.endpointPods), time.Second, wait.ForeverTestTimeout)
if err != nil {
framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
}
// wait for kube-proxy to catch up with the pod being deleted.
time.Sleep(5 * time.Second)
}
func (config *NetworkingTestConfig) createPod(pod *api.Pod) *api.Pod {
createdPod, err := config.getPodClient().Create(pod)
if err != nil {
framework.Failf("Failed to create %s pod: %v", pod.Name, err)
}
return createdPod
}
func (config *NetworkingTestConfig) getPodClient() client.PodInterface {
return config.f.Client.Pods(config.ns)
}
func (config *NetworkingTestConfig) getServiceClient() client.ServiceInterface {
return config.f.Client.Services(config.ns)
}
func (config *NetworkingTestConfig) getNamespacesClient() client.NamespaceInterface {
return config.f.Client.Namespaces()
}