diff --git a/test/e2e/framework/pod/resource.go b/test/e2e/framework/pod/resource.go index 6d850c849f6..e0d6479f8a1 100644 --- a/test/e2e/framework/pod/resource.go +++ b/test/e2e/framework/pod/resource.go @@ -516,19 +516,18 @@ func newExecPodSpec(ns, generateName string) *v1.Pod { return pod } -// CreateExecPodOrFail creates a simple busybox pod in a sleep loop used as a -// vessel for kubectl exec commands. -// Returns the name of the created pod. -func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tweak func(*v1.Pod)) string { +// CreateExecPodOrFail creates a agnhost pause pod used as a vessel for kubectl exec commands. +// Pod name is uniquely generated. +func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tweak func(*v1.Pod)) *v1.Pod { e2elog.Logf("Creating new exec pod") - execPod := newExecPodSpec(ns, generateName) + pod := newExecPodSpec(ns, generateName) if tweak != nil { - tweak(execPod) + tweak(pod) } - created, err := client.CoreV1().Pods(ns).Create(execPod) + execPod, err := client.CoreV1().Pods(ns).Create(pod) expectNoError(err, "failed to create new exec pod in namespace: %s", ns) err = wait.PollImmediate(poll, 5*time.Minute, func() (bool, error) { - retrievedPod, err := client.CoreV1().Pods(execPod.Namespace).Get(created.Name, metav1.GetOptions{}) + retrievedPod, err := client.CoreV1().Pods(execPod.Namespace).Get(execPod.Name, metav1.GetOptions{}) if err != nil { if testutils.IsRetryableAPIError(err) { return false, nil @@ -538,7 +537,7 @@ func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tw return retrievedPod.Status.Phase == v1.PodRunning, nil }) expectNoError(err) - return created.Name + return execPod } // CreatePodOrFail creates a pod with the specified containerPorts. diff --git a/test/e2e/framework/service/BUILD b/test/e2e/framework/service/BUILD index abbfb5b46e9..e3e6ff90801 100644 --- a/test/e2e/framework/service/BUILD +++ b/test/e2e/framework/service/BUILD @@ -16,18 +16,23 @@ go_library( deps = [ "//pkg/apis/core:go_default_library", "//pkg/registry/core/service/portallocator:go_default_library", + "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e/framework/log:go_default_library", diff --git a/test/e2e/framework/service/const.go b/test/e2e/framework/service/const.go index 0810f675b32..b4901f3cbc4 100644 --- a/test/e2e/framework/service/const.go +++ b/test/e2e/framework/service/const.go @@ -75,4 +75,7 @@ const ( // AffinityConfirmCount is the number of needed continuous requests to confirm that // affinity is enabled. AffinityConfirmCount = 15 + + // ServiceEndpointsTimeout is the maximum time in which endpoints for the service should be created. + ServiceEndpointsTimeout = 2 * time.Minute ) diff --git a/test/e2e/framework/service/hostname.go b/test/e2e/framework/service/hostname.go index f6019d0b694..98f393d2764 100644 --- a/test/e2e/framework/service/hostname.go +++ b/test/e2e/framework/service/hostname.go @@ -102,9 +102,9 @@ func StopServeHostnameService(clientset clientset.Interface, ns, name string) er // in the cluster. Each pod in the service is expected to echo its name. These // names are compared with the given expectedPods list after a sort | uniq. func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error { - execPodName := e2epod.CreateExecPodOrFail(c, ns, "execpod-", nil) + execPod := e2epod.CreateExecPodOrFail(c, ns, "execpod-", nil) defer func() { - e2epod.DeletePodOrFail(c, ns, execPodName) + e2epod.DeletePodOrFail(c, ns, execPod.Name) }() // Loop a bunch of times - the proxy is randomized, so we want a good @@ -129,11 +129,11 @@ func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expect // verify service from pod func() string { cmd := buildCommand("wget -q -T 1 -O -") - e2elog.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName) + e2elog.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPod.Name) // TODO: Use exec-over-http via the netexec pod instead of kubectl exec. - output, err := framework.RunHostCmd(ns, execPodName, cmd) + output, err := framework.RunHostCmd(ns, execPod.Name, cmd) if err != nil { - e2elog.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output) + e2elog.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPod.Name, err, output) } return output }, diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index b6e9541066e..eff844cc07f 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -21,35 +21,45 @@ import ( "fmt" "net" "net/http" + "regexp" "strconv" "strings" "time" "github.com/onsi/ginkgo" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" "k8s.io/kubernetes/test/e2e/framework" e2elog "k8s.io/kubernetes/test/e2e/framework/log" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + testutils "k8s.io/kubernetes/test/utils" imageutils "k8s.io/kubernetes/test/utils/image" ) // NodePortRange should match whatever the default/configured range is var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768} -// TestJig is a test j to help service testing. +// PauseDeploymentLabels are unique deployment selector labels for pause pod +var PauseDeploymentLabels = map[string]string{"deployment": "agnhost-pause"} + +// TestJig is a test jig to help service testing. type TestJig struct { ID string Name string @@ -335,6 +345,56 @@ func (j *TestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) framework.ExpectNoError(err) } +// WaitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout +func (j *TestJig) WaitForAvailableEndpoint(namespace, serviceName string, timeout time.Duration) { + //Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run + endpointSelector := fields.OneTermEqualSelector("metadata.name", serviceName) + stopCh := make(chan struct{}) + endpointAvailable := false + var controller cache.Controller + _, controller = cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = endpointSelector.String() + obj, err := j.Client.CoreV1().Endpoints(namespace).List(options) + return runtime.Object(obj), err + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = endpointSelector.String() + return j.Client.CoreV1().Endpoints(namespace).Watch(options) + }, + }, + &v1.Endpoints{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if e, ok := obj.(*v1.Endpoints); ok { + if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { + endpointAvailable = true + } + } + }, + UpdateFunc: func(old, cur interface{}) { + if e, ok := cur.(*v1.Endpoints); ok { + if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 { + endpointAvailable = true + } + } + }, + }, + ) + defer func() { + close(stopCh) + }() + + go controller.Run(stopCh) + + err := wait.Poll(1*time.Second, timeout, func() (bool, error) { + return endpointAvailable, nil + }) + framework.ExpectNoError(err, "No subset of available IP address found for the endpoint %s within timeout %v", serviceName, timeout) +} + // SanityCheckService performs sanity checks on the given service func (j *TestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) { if svc.Spec.Type != svcType { @@ -697,6 +757,111 @@ func (j *TestJig) waitForPodsReady(namespace string, pods []string) error { return nil } +func testReachabilityOverServiceName(serviceName string, sp v1.ServicePort, execPod *v1.Pod) { + testEndpointReachability(serviceName, sp.Port, sp.Protocol, execPod) +} + +func testReachabilityOverClusterIP(clusterIP string, sp v1.ServicePort, execPod *v1.Pod) { + // If .spec.clusterIP is set to "" or "None" for service, ClusterIP is not created, so reachability can not be tested over clusterIP:servicePort + isClusterIPV46, err := regexp.MatchString(framework.RegexIPv4+"||"+framework.RegexIPv6, clusterIP) + framework.ExpectNoError(err, "Unable to parse ClusterIP: %s", clusterIP) + if isClusterIPV46 { + testEndpointReachability(clusterIP, sp.Port, sp.Protocol, execPod) + } +} +func testReachabilityOverNodePorts(nodes *v1.NodeList, sp v1.ServicePort, pod *v1.Pod) { + internalAddrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP) + externalAddrs := e2enode.CollectAddresses(nodes, v1.NodeExternalIP) + for _, internalAddr := range internalAddrs { + testEndpointReachability(internalAddr, sp.NodePort, sp.Protocol, pod) + } + for _, externalAddr := range externalAddrs { + testEndpointReachability(externalAddr, sp.NodePort, sp.Protocol, pod) + } +} + +// testEndpointReachability tests reachability to endpoints (i.e. IP, ServiceName) and ports. Test request is initiated from specified execPod. +// TCP and UDP protocol based service are supported at this moment +// TODO: add support to test SCTP Protocol based services. +func testEndpointReachability(endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod) { + ep := net.JoinHostPort(endpoint, strconv.Itoa(int(port))) + cmd := "" + switch protocol { + case v1.ProtocolTCP: + cmd = fmt.Sprintf("nc -z -t -w 2 %s %v", endpoint, port) + case v1.ProtocolUDP: + cmd = fmt.Sprintf("nc -z -u -w 2 %s %v", endpoint, port) + default: + e2elog.Failf("Service reachablity check is not supported for %v", protocol) + } + if cmd != "" { + _, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + framework.ExpectNoError(err, "Service is not reachable on following endpoint %s over %s protocol", ep, protocol) + } +} + +// checkClusterIPServiceReachability ensures that service of type ClusterIP is reachable over +// - ServiceName:ServicePort, ClusterIP:ServicePort +func (j *TestJig) checkClusterIPServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) { + clusterIP := svc.Spec.ClusterIP + servicePorts := svc.Spec.Ports + + j.WaitForAvailableEndpoint(namespace, svc.Name, ServiceEndpointsTimeout) + + for _, servicePort := range servicePorts { + testReachabilityOverServiceName(svc.Name, servicePort, pod) + testReachabilityOverClusterIP(clusterIP, servicePort, pod) + } +} + +// checkNodePortServiceReachability ensures that service of type nodePort are reachable +// - Internal clients should be reachable to service over - +// ServiceName:ServicePort, ClusterIP:ServicePort and NodeInternalIPs:NodePort +// - External clients should be reachable to service over - +// NodePublicIPs:NodePort +func (j *TestJig) checkNodePortServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) { + clusterIP := svc.Spec.ClusterIP + servicePorts := svc.Spec.Ports + + // Consider only 2 nodes for testing + nodes := j.GetNodes(2) + + j.WaitForAvailableEndpoint(namespace, svc.Name, ServiceEndpointsTimeout) + + for _, servicePort := range servicePorts { + testReachabilityOverServiceName(svc.Name, servicePort, pod) + testReachabilityOverClusterIP(clusterIP, servicePort, pod) + testReachabilityOverNodePorts(nodes, servicePort, pod) + } +} + +// checkExternalServiceReachability ensures service of type externalName resolves to IP address and no fake externalName is set +// FQDN of kubernetes is used as externalName(for air tight platforms). +func (j *TestJig) checkExternalServiceReachability(svc *v1.Service, pod *v1.Pod) { + // Service must resolve to IP + cmd := fmt.Sprintf("nslookup %s", svc.Name) + _, err := framework.RunHostCmd(pod.Namespace, pod.Name, cmd) + framework.ExpectNoError(err, "ExternalName service must resolve to IP") +} + +// CheckServiceReachability ensures that request are served by the services. Only supports Services with type ClusterIP, NodePort and ExternalName. +func (j *TestJig) CheckServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) { + svcType := svc.Spec.Type + + j.SanityCheckService(svc, svcType) + + switch svcType { + case v1.ServiceTypeClusterIP: + j.checkClusterIPServiceReachability(namespace, svc, pod) + case v1.ServiceTypeNodePort: + j.checkNodePortServiceReachability(namespace, svc, pod) + case v1.ServiceTypeExternalName: + j.checkExternalServiceReachability(svc, pod) + default: + e2elog.Failf("Unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type.", svcType, svc.Name) + } +} + // LaunchNetexecPodOnNode launches a netexec pod on the given node. func (j *TestJig) LaunchNetexecPodOnNode(f *framework.Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) { e2elog.Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name) @@ -866,6 +1031,23 @@ func (j *TestJig) TestHTTPHealthCheckNodePort(host string, port int, request str return nil } +// CreateServicePods creates a replication controller with the label same as service +func (j *TestJig) CreateServicePods(c clientset.Interface, ns string, replica int) { + config := testutils.RCConfig{ + Client: c, + Name: j.Name, + Image: framework.ServeHostnameImage, + Command: []string{"/agnhost", "serve-hostname"}, + Namespace: ns, + Labels: j.Labels, + PollInterval: 3 * time.Second, + Timeout: framework.PodReadyBeforeTimeout, + Replicas: replica, + } + err := framework.RunRC(config) + framework.ExpectNoError(err, "Replica must be created") +} + // CheckAffinity function tests whether the service affinity works as expected. // If affinity is expected, the test will return true once affinityConfirmCount // number of same response observed in a row. If affinity is not expected, the @@ -873,7 +1055,7 @@ func (j *TestJig) TestHTTPHealthCheckNodePort(host string, port int, request str // return false only in case of unexpected errors. func (j *TestJig) CheckAffinity(execPod *v1.Pod, targetIP string, targetPort int, shouldHold bool) bool { targetIPPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort)) - cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIPPort) + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, targetIPPort) timeout := TestTimeout if execPod == nil { timeout = LoadBalancerPollTimeout @@ -957,6 +1139,55 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re Transport: tr, Timeout: timeout, } - return client.Get(url) } + +// CreatePausePodDeployment creates a deployment for agnhost-pause pod running in different nodes +func (j *TestJig) CreatePausePodDeployment(name, ns string, replica int32) *appsv1.Deployment { + // terminationGracePeriod is set to 0 to reduce deployment deletion time for infinitely running pause pod. + terminationGracePeriod := int64(0) + pauseDeployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: PauseDeploymentLabels, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replica, + Selector: &metav1.LabelSelector{ + MatchLabels: PauseDeploymentLabels, + }, + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: PauseDeploymentLabels, + }, + Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &terminationGracePeriod, + Affinity: &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{MatchLabels: PauseDeploymentLabels}, + TopologyKey: "kubernetes.io/hostname", + Namespaces: []string{ns}, + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: "agnhost-pause", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{"pause"}, + }, + }, + }, + }, + }, + } + deployment, err := j.Client.AppsV1().Deployments(ns).Create(pauseDeployment) + framework.ExpectNoError(err, "Error in creating deployment for pause pod") + return deployment +} diff --git a/test/e2e/instrumentation/monitoring/stackdriver_metadata_agent.go b/test/e2e/instrumentation/monitoring/stackdriver_metadata_agent.go index a25e0a6d4b5..34acc89269e 100644 --- a/test/e2e/instrumentation/monitoring/stackdriver_metadata_agent.go +++ b/test/e2e/instrumentation/monitoring/stackdriver_metadata_agent.go @@ -73,7 +73,7 @@ func testAgent(f *framework.Framework, kubeClient clientset.Interface) { } // Create test pod with unique name. - e2epod.CreateExecPodOrFail(kubeClient, f.Namespace.Name, uniqueContainerName, func(pod *v1.Pod) { + _ = e2epod.CreateExecPodOrFail(kubeClient, f.Namespace.Name, uniqueContainerName, func(pod *v1.Pod) { pod.Spec.Containers[0].Name = uniqueContainerName }) defer kubeClient.CoreV1().Pods(f.Namespace.Name).Delete(uniqueContainerName, &metav1.DeleteOptions{}) diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 1290097fe88..06c17ce863e 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -38,6 +38,7 @@ import ( cloudprovider "k8s.io/cloud-provider" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/test/e2e/framework" + e2edeploy "k8s.io/kubernetes/test/e2e/framework/deployment" e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" e2elog "k8s.io/kubernetes/test/e2e/framework/log" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -292,38 +293,53 @@ var _ = SIGDescribe("Services", func() { serviceIP := tcpService.Spec.ClusterIP e2elog.Logf("sourceip-test cluster ip: %s", serviceIP) - ginkgo.By("Picking multiple nodes") - nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) - - if len(nodes.Items) == 1 { - framework.Skipf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider) + ginkgo.By("Picking 2 Nodes to test whether source IP is preserved or not") + nodes := jig.GetNodes(2) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + framework.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) } - node1 := nodes.Items[0] - node2 := nodes.Items[1] - ginkgo.By("Creating a webserver pod be part of the TCP service which echoes back source ip") serverPodName := "echoserver-sourceip" - jig.LaunchEchoserverPodOnNode(f, node1.Name, serverPodName) + jig.LaunchEchoserverPodOnNode(f, "", serverPodName) defer func() { e2elog.Logf("Cleaning up the echo server pod") err := cs.CoreV1().Pods(ns).Delete(serverPodName, nil) - framework.ExpectNoError(err, "failed to delete pod: %s on node: %s", serverPodName, node1.Name) + framework.ExpectNoError(err, "failed to delete pod: %s on node", serverPodName) }() // Waiting for service to expose endpoint. err := e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{serverPodName: {servicePort}}) framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) - ginkgo.By("Retrieve sourceip from a pod on the same node") - sourceIP1, execPodIP1 := execSourceipTest(f, cs, ns, node1.Name, serviceIP, servicePort) - ginkgo.By("Verifying the preserved source ip") - framework.ExpectEqual(sourceIP1, execPodIP1) + ginkgo.By("Creating pause pod deployment") + deployment := jig.CreatePausePodDeployment("pause-pod", ns, int32(nodeCounts)) - ginkgo.By("Retrieve sourceip from a pod on a different node") - sourceIP2, execPodIP2 := execSourceipTest(f, cs, ns, node2.Name, serviceIP, servicePort) - ginkgo.By("Verifying the preserved source ip") - framework.ExpectEqual(sourceIP2, execPodIP2) + defer func() { + e2elog.Logf("Deleting deployment") + err = cs.AppsV1().Deployments(ns).Delete(deployment.Name, &metav1.DeleteOptions{}) + framework.ExpectNoError(err, "Failed to delete deployment %s", deployment.Name) + }() + + framework.ExpectNoError(e2edeploy.WaitForDeploymentComplete(cs, deployment), "Failed to complete pause pod deployment") + + deployment, err = cs.AppsV1().Deployments(ns).Get(deployment.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "Error in retriving pause pod deployment") + labelSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector) + + pausePods, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{LabelSelector: labelSelector.String()}) + framework.ExpectNoError(err, "Error in listing pods associated with pause pod deployments") + + gomega.Expect(pausePods.Items[0].Spec.NodeName).ToNot(gomega.Equal(pausePods.Items[1].Spec.NodeName)) + + serviceAddress := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) + + for _, pausePod := range pausePods.Items { + sourceIP, execPodIP := execSourceipTest(pausePod, serviceAddress) + ginkgo.By("Verifying the preserved source ip") + framework.ExpectEqual(sourceIP, execPodIP) + } }) ginkgo.It("should be able to up and down services", func() { @@ -499,35 +515,17 @@ var _ = SIGDescribe("Services", func() { ns := f.Namespace.Name jig := e2eservice.NewTestJig(cs, serviceName) - nodeIP, err := e2enode.PickIP(jig.Client) // for later - if err != nil { - e2elog.Logf("Unexpected error occurred: %v", err) - } - // TODO: write a wrapper for ExpectNoErrorWithOffset() - framework.ExpectNoErrorWithOffset(0, err) ginkgo.By("creating service " + serviceName + " with type=NodePort in namespace " + ns) - service := jig.CreateTCPServiceOrFail(ns, func(svc *v1.Service) { + nodePortService := jig.CreateTCPServiceOrFail(ns, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(9376)}, + } }) - jig.SanityCheckService(service, v1.ServiceTypeNodePort) - nodePort := int(service.Spec.Ports[0].NodePort) - - ginkgo.By("creating pod to be part of service " + serviceName) - jig.RunOrFail(ns, nil) - - ginkgo.By("hitting the pod through the service's NodePort") - jig.TestReachableHTTP(nodeIP, nodePort, e2eservice.KubeProxyLagTimeout) - - ginkgo.By("verifying the node port is locked") - hostExec := e2epod.LaunchHostExecPod(f.ClientSet, f.Namespace.Name, "hostexec") - // Even if the node-ip:node-port check above passed, this hostexec pod - // might fall on a node with a laggy kube-proxy. - cmd := fmt.Sprintf(`for i in $(seq 1 300); do if ss -ant46 'sport = :%d' | grep ^LISTEN; then exit 0; fi; sleep 1; done; exit 1`, nodePort) - stdout, err := framework.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd) - if err != nil { - e2elog.Failf("expected node port %d to be in use, stdout: %v. err: %v", nodePort, stdout, err) - } + jig.CreateServicePods(cs, ns, 2) + execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil) + jig.CheckServiceReachability(ns, nodePortService, execPod) }) // TODO: Get rid of [DisabledForLargeClusters] tag when issue #56138 is fixed. @@ -963,15 +961,19 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns) }() jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName) + ginkgo.By("changing the ExternalName service to type=ClusterIP") clusterIPService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeClusterIP s.Spec.ExternalName = "" s.Spec.Ports = []v1.ServicePort{ - {Port: 80, Name: "http", Protocol: v1.ProtocolTCP}, + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(9376)}, } }) - jig.SanityCheckService(clusterIPService, v1.ServiceTypeClusterIP) + + jig.CreateServicePods(cs, ns, 2) + execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil) + jig.CheckServiceReachability(ns, clusterIPService, execPod) }) ginkgo.It("should be able to change the type from ExternalName to NodePort", func() { @@ -987,15 +989,19 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns) }() jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName) + ginkgo.By("changing the ExternalName service to type=NodePort") nodePortService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeNodePort s.Spec.ExternalName = "" s.Spec.Ports = []v1.ServicePort{ - {Port: 80, Name: "http", Protocol: v1.ProtocolTCP}, + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(9376)}, } }) - jig.SanityCheckService(nodePortService, v1.ServiceTypeNodePort) + jig.CreateServicePods(cs, ns, 2) + + execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil) + jig.CheckServiceReachability(ns, nodePortService, execPod) }) ginkgo.It("should be able to change the type from ClusterIP to ExternalName", func() { @@ -1011,13 +1017,22 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns) }() jig.SanityCheckService(clusterIPService, v1.ServiceTypeClusterIP) + + ginkgo.By("Creating active service to test reachability when its FQDN is referred as externalName for another service") + externalServiceName := "externalsvc" + externalServiceFQDN := createAndGetExternalServiceFQDN(cs, ns, externalServiceName) + defer func() { + framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, externalServiceName)) + }() + ginkgo.By("changing the ClusterIP service to type=ExternalName") externalNameService := jig.UpdateServiceOrFail(ns, clusterIPService.Name, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeExternalName - s.Spec.ExternalName = "foo.example.com" + s.Spec.ExternalName = externalServiceFQDN s.Spec.ClusterIP = "" }) - jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName) + execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil) + jig.CheckServiceReachability(ns, externalNameService, execPod) }) ginkgo.It("should be able to change the type from NodePort to ExternalName", func() { @@ -1035,14 +1050,24 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns) }() jig.SanityCheckService(nodePortService, v1.ServiceTypeNodePort) + + ginkgo.By("Creating active service to test reachability when its FQDN is referred as externalName for another service") + externalServiceName := "externalsvc" + externalServiceFQDN := createAndGetExternalServiceFQDN(cs, ns, externalServiceName) + defer func() { + framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, externalServiceName)) + }() + ginkgo.By("changing the NodePort service to type=ExternalName") externalNameService := jig.UpdateServiceOrFail(ns, nodePortService.Name, func(s *v1.Service) { s.Spec.Type = v1.ServiceTypeExternalName - s.Spec.ExternalName = "foo.example.com" + s.Spec.ExternalName = externalServiceFQDN s.Spec.ClusterIP = "" s.Spec.Ports[0].NodePort = 0 }) - jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName) + execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil) + jig.CheckServiceReachability(ns, externalNameService, execPod) + }) ginkgo.It("should use same NodePort with same port but different protocols", func() { @@ -1345,8 +1370,9 @@ var _ = SIGDescribe("Services", func() { svcName := fmt.Sprintf("%v.%v.svc.%v", serviceName, f.Namespace.Name, framework.TestContext.ClusterDNSDomain) ginkgo.By("Waiting for endpoints of Service with DNS name " + svcName) - execPodName := e2epod.CreateExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-", nil) - cmd := fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port) + execPod := e2epod.CreateExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-", nil) + execPodName := execPod.Name + cmd := fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/", svcName, port) var stdout string if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { var err error @@ -1370,7 +1396,7 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err) ginkgo.By("Check if pod is unreachable") - cmd = fmt.Sprintf("wget -qO- -T 2 http://%s:%d/; test \"$?\" -eq \"1\"", svcName, port) + cmd = fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/; test \"$?\" -ne \"0\"", svcName, port) if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { var err error stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd) @@ -1390,7 +1416,7 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err) ginkgo.By("Check if terminating pod is available through service") - cmd = fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port) + cmd = fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/", svcName, port) if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { var err error stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd) @@ -1441,13 +1467,8 @@ var _ = SIGDescribe("Services", func() { ginkgo.By("Prepare allow source ips") // prepare the exec pods // acceptPod are allowed to access the loadbalancer - acceptPodName := e2epod.CreateExecPodOrFail(cs, namespace, "execpod-accept", nil) - dropPodName := e2epod.CreateExecPodOrFail(cs, namespace, "execpod-drop", nil) - - acceptPod, err := cs.CoreV1().Pods(namespace).Get(acceptPodName, metav1.GetOptions{}) - framework.ExpectNoError(err, "failed to fetch pod: %s in namespace: %s", acceptPodName, namespace) - dropPod, err := cs.CoreV1().Pods(namespace).Get(dropPodName, metav1.GetOptions{}) - framework.ExpectNoError(err, "failed to fetch pod: %s in namespace: %s", dropPodName, namespace) + acceptPod := e2epod.CreateExecPodOrFail(cs, namespace, "execpod-accept", nil) + dropPod := e2epod.CreateExecPodOrFail(cs, namespace, "execpod-drop", nil) ginkgo.By("creating a pod to be part of the service " + serviceName) // This container is an nginx container listening on port 80 @@ -1465,7 +1486,7 @@ var _ = SIGDescribe("Services", func() { svc.Spec.Type = v1.ServiceTypeNodePort svc.Spec.LoadBalancerSourceRanges = nil }) - err = cs.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil) + err := cs.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil) framework.ExpectNoError(err) }() @@ -1479,23 +1500,23 @@ var _ = SIGDescribe("Services", func() { svcIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) // Wait longer as this is our first request after creation. We can't check using a separate method, // because the LB should only be reachable from the "accept" pod - framework.CheckReachabilityFromPod(true, loadBalancerLagTimeout, namespace, acceptPodName, svcIP) - framework.CheckReachabilityFromPod(false, normalReachabilityTimeout, namespace, dropPodName, svcIP) + framework.CheckReachabilityFromPod(true, loadBalancerLagTimeout, namespace, acceptPod.Name, svcIP) + framework.CheckReachabilityFromPod(false, normalReachabilityTimeout, namespace, dropPod.Name, svcIP) ginkgo.By("Update service LoadBalancerSourceRange and check reachability") jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { // only allow access from dropPod svc.Spec.LoadBalancerSourceRanges = []string{dropPod.Status.PodIP + "/32"} }) - framework.CheckReachabilityFromPod(false, normalReachabilityTimeout, namespace, acceptPodName, svcIP) - framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, dropPodName, svcIP) + framework.CheckReachabilityFromPod(false, normalReachabilityTimeout, namespace, acceptPod.Name, svcIP) + framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, dropPod.Name, svcIP) ginkgo.By("Delete LoadBalancerSourceRange field and check reachability") jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { svc.Spec.LoadBalancerSourceRanges = nil }) - framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, acceptPodName, svcIP) - framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, dropPodName, svcIP) + framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, acceptPod.Name, svcIP) + framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, dropPod.Name, svcIP) }) // TODO: Get rid of [DisabledForLargeClusters] tag when issue #56138 is fixed. @@ -1843,11 +1864,9 @@ var _ = SIGDescribe("Services", func() { podName := "execpod-noendpoints" ginkgo.By(fmt.Sprintf("creating %v on node %v", podName, nodeName)) - execPodName := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) { + execPod := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) { pod.Spec.NodeName = nodeName }) - execPod, err := f.ClientSet.CoreV1().Pods(namespace).Get(execPodName, metav1.GetOptions{}) - framework.ExpectNoError(err) serviceAddress := net.JoinHostPort(serviceName, strconv.Itoa(port)) e2elog.Logf("waiting up to %v to connect to %v", e2eservice.KubeProxyEndpointLagTimeout, serviceAddress) @@ -2161,18 +2180,16 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { podName := "execpod-sourceip" ginkgo.By(fmt.Sprintf("Creating %v on node %v", podName, nodeName)) - execPodName := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) { + execPod := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) { pod.Spec.NodeName = nodeName }) defer func() { - err := cs.CoreV1().Pods(namespace).Delete(execPodName, nil) - framework.ExpectNoError(err, "failed to delete pod: %s", execPodName) + err := cs.CoreV1().Pods(namespace).Delete(execPod.Name, nil) + framework.ExpectNoError(err, "failed to delete pod: %s", execPod.Name) }() - execPod, err := f.ClientSet.CoreV1().Pods(namespace).Get(execPodName, metav1.GetOptions{}) - framework.ExpectNoError(err) - e2elog.Logf("Waiting up to %v wget %v", e2eservice.KubeProxyLagTimeout, path) - cmd := fmt.Sprintf(`wget -T 30 -qO- %v`, path) + e2elog.Logf("Waiting up to %v curl %v", e2eservice.KubeProxyLagTimeout, path) + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %v`, path) var srcIP string ginkgo.By(fmt.Sprintf("Hitting external lb %v from pod %v on node %v", ingressIP, podName, nodeName)) @@ -2298,31 +2315,20 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { }) }) -func execSourceipTest(f *framework.Framework, c clientset.Interface, ns, nodeName, serviceIP string, servicePort int) (string, string) { - e2elog.Logf("Creating an exec pod on node %v", nodeName) - execPodName := e2epod.CreateExecPodOrFail(f.ClientSet, ns, fmt.Sprintf("execpod-sourceip-%s", nodeName), func(pod *v1.Pod) { - pod.Spec.NodeName = nodeName - }) - defer func() { - e2elog.Logf("Cleaning up the exec pod") - err := c.CoreV1().Pods(ns).Delete(execPodName, nil) - framework.ExpectNoError(err, "failed to delete pod: %s", execPodName) - }() - execPod, err := f.ClientSet.CoreV1().Pods(ns).Get(execPodName, metav1.GetOptions{}) - framework.ExpectNoError(err) - +func execSourceipTest(pausePod v1.Pod, serviceAddress string) (string, string) { + var err error var stdout string - serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort)) timeout := 2 * time.Minute - e2elog.Logf("Waiting up to %v wget %s", timeout, serviceIPPort) - cmd := fmt.Sprintf(`wget -T 30 -qO- %s | grep client_address`, serviceIPPort) + + e2elog.Logf("Waiting up to %v to get response from %s", timeout, serviceAddress) + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %s | grep client_address`, serviceAddress) for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) { - stdout, err = framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + stdout, err = framework.RunHostCmd(pausePod.Namespace, pausePod.Name, cmd) if err != nil { e2elog.Logf("got err: %v, retry until timeout", err) continue } - // Need to check output because wget -q might omit the error. + // Need to check output because it might omit in case of error. if strings.TrimSpace(stdout) == "" { e2elog.Logf("got empty stdout, retry until timeout") continue @@ -2339,7 +2345,7 @@ func execSourceipTest(f *framework.Framework, c clientset.Interface, ns, nodeNam // ginkgo.Fail the test if output format is unexpected. e2elog.Failf("exec pod returned unexpected stdout format: [%v]\n", stdout) } - return execPod.Status.PodIP, outputs[1] + return pausePod.Status.PodIP, outputs[1] } func execAffinityTestForNonLBServiceWithTransition(f *framework.Framework, cs clientset.Interface, svc *v1.Service) { @@ -2380,14 +2386,14 @@ func execAffinityTestForNonLBServiceWithOptionalTransition(f *framework.Framewor svcIP = svc.Spec.ClusterIP } - execPodName := e2epod.CreateExecPodOrFail(cs, ns, "execpod-affinity", nil) + execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod-affinity", nil) defer func() { e2elog.Logf("Cleaning up the exec pod") - err := cs.CoreV1().Pods(ns).Delete(execPodName, nil) - framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPodName, ns) + err := cs.CoreV1().Pods(ns).Delete(execPod.Name, nil) + framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPod.Name, ns) }() - execPod, err := cs.CoreV1().Pods(ns).Get(execPodName, metav1.GetOptions{}) - framework.ExpectNoError(err, "failed to fetch pod: %s in namespace: %s", execPodName, ns) + jig.CheckServiceReachability(ns, svc, execPod) + framework.ExpectNoError(err, "failed to fetch pod: %s in namespace: %s", execPod.Name, ns) if !isTransitionTest { gomega.Expect(jig.CheckAffinity(execPod, svcIP, servicePort, true)).To(gomega.BeTrue()) @@ -2451,3 +2457,9 @@ func execAffinityTestForLBServiceWithOptionalTransition(f *framework.Framework, gomega.Expect(jig.CheckAffinity(nil, ingressIP, port, true)).To(gomega.BeTrue()) } } + +func createAndGetExternalServiceFQDN(cs clientset.Interface, ns, serviceName string) string { + _, _, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(serviceName), ns, 2) + framework.ExpectNoError(err, "Expected Service %s to be running", serviceName) + return fmt.Sprintf("%s.%s.svc.%s", serviceName, ns, framework.TestContext.ClusterDNSDomain) +}