Add service reachability test util function and e2e fixes

This commit is contained in:
Mayank Gaikwad 2019-07-24 23:07:53 +05:30
parent 1dac5fd14a
commit 1438a3c8fc
7 changed files with 371 additions and 121 deletions

View File

@ -516,19 +516,18 @@ func newExecPodSpec(ns, generateName string) *v1.Pod {
return pod
}
// CreateExecPodOrFail creates a simple busybox pod in a sleep loop used as a
// vessel for kubectl exec commands.
// Returns the name of the created pod.
func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tweak func(*v1.Pod)) string {
// CreateExecPodOrFail creates a agnhost pause pod used as a vessel for kubectl exec commands.
// Pod name is uniquely generated.
func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tweak func(*v1.Pod)) *v1.Pod {
e2elog.Logf("Creating new exec pod")
execPod := newExecPodSpec(ns, generateName)
pod := newExecPodSpec(ns, generateName)
if tweak != nil {
tweak(execPod)
tweak(pod)
}
created, err := client.CoreV1().Pods(ns).Create(execPod)
execPod, err := client.CoreV1().Pods(ns).Create(pod)
expectNoError(err, "failed to create new exec pod in namespace: %s", ns)
err = wait.PollImmediate(poll, 5*time.Minute, func() (bool, error) {
retrievedPod, err := client.CoreV1().Pods(execPod.Namespace).Get(created.Name, metav1.GetOptions{})
retrievedPod, err := client.CoreV1().Pods(execPod.Namespace).Get(execPod.Name, metav1.GetOptions{})
if err != nil {
if testutils.IsRetryableAPIError(err) {
return false, nil
@ -538,7 +537,7 @@ func CreateExecPodOrFail(client clientset.Interface, ns, generateName string, tw
return retrievedPod.Status.Phase == v1.PodRunning, nil
})
expectNoError(err)
return created.Name
return execPod
}
// CreatePodOrFail creates a pod with the specified containerPorts.

View File

@ -16,18 +16,23 @@ go_library(
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/framework/log:go_default_library",

View File

@ -75,4 +75,7 @@ const (
// AffinityConfirmCount is the number of needed continuous requests to confirm that
// affinity is enabled.
AffinityConfirmCount = 15
// ServiceEndpointsTimeout is the maximum time in which endpoints for the service should be created.
ServiceEndpointsTimeout = 2 * time.Minute
)

View File

@ -102,9 +102,9 @@ func StopServeHostnameService(clientset clientset.Interface, ns, name string) er
// in the cluster. Each pod in the service is expected to echo its name. These
// names are compared with the given expectedPods list after a sort | uniq.
func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
execPodName := e2epod.CreateExecPodOrFail(c, ns, "execpod-", nil)
execPod := e2epod.CreateExecPodOrFail(c, ns, "execpod-", nil)
defer func() {
e2epod.DeletePodOrFail(c, ns, execPodName)
e2epod.DeletePodOrFail(c, ns, execPod.Name)
}()
// Loop a bunch of times - the proxy is randomized, so we want a good
@ -129,11 +129,11 @@ func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expect
// verify service from pod
func() string {
cmd := buildCommand("wget -q -T 1 -O -")
e2elog.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName)
e2elog.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPod.Name)
// TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
output, err := framework.RunHostCmd(ns, execPodName, cmd)
output, err := framework.RunHostCmd(ns, execPod.Name, cmd)
if err != nil {
e2elog.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output)
e2elog.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPod.Name, err, output)
}
return output
},

View File

@ -21,35 +21,45 @@ import (
"fmt"
"net"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"github.com/onsi/ginkgo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
)
// NodePortRange should match whatever the default/configured range is
var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
// TestJig is a test j to help service testing.
// PauseDeploymentLabels are unique deployment selector labels for pause pod
var PauseDeploymentLabels = map[string]string{"deployment": "agnhost-pause"}
// TestJig is a test jig to help service testing.
type TestJig struct {
ID string
Name string
@ -335,6 +345,56 @@ func (j *TestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string)
framework.ExpectNoError(err)
}
// WaitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
func (j *TestJig) WaitForAvailableEndpoint(namespace, serviceName string, timeout time.Duration) {
//Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
endpointSelector := fields.OneTermEqualSelector("metadata.name", serviceName)
stopCh := make(chan struct{})
endpointAvailable := false
var controller cache.Controller
_, controller = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = endpointSelector.String()
obj, err := j.Client.CoreV1().Endpoints(namespace).List(options)
return runtime.Object(obj), err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = endpointSelector.String()
return j.Client.CoreV1().Endpoints(namespace).Watch(options)
},
},
&v1.Endpoints{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if e, ok := obj.(*v1.Endpoints); ok {
if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
endpointAvailable = true
}
}
},
UpdateFunc: func(old, cur interface{}) {
if e, ok := cur.(*v1.Endpoints); ok {
if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
endpointAvailable = true
}
}
},
},
)
defer func() {
close(stopCh)
}()
go controller.Run(stopCh)
err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
return endpointAvailable, nil
})
framework.ExpectNoError(err, "No subset of available IP address found for the endpoint %s within timeout %v", serviceName, timeout)
}
// SanityCheckService performs sanity checks on the given service
func (j *TestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
if svc.Spec.Type != svcType {
@ -697,6 +757,111 @@ func (j *TestJig) waitForPodsReady(namespace string, pods []string) error {
return nil
}
func testReachabilityOverServiceName(serviceName string, sp v1.ServicePort, execPod *v1.Pod) {
testEndpointReachability(serviceName, sp.Port, sp.Protocol, execPod)
}
func testReachabilityOverClusterIP(clusterIP string, sp v1.ServicePort, execPod *v1.Pod) {
// If .spec.clusterIP is set to "" or "None" for service, ClusterIP is not created, so reachability can not be tested over clusterIP:servicePort
isClusterIPV46, err := regexp.MatchString(framework.RegexIPv4+"||"+framework.RegexIPv6, clusterIP)
framework.ExpectNoError(err, "Unable to parse ClusterIP: %s", clusterIP)
if isClusterIPV46 {
testEndpointReachability(clusterIP, sp.Port, sp.Protocol, execPod)
}
}
func testReachabilityOverNodePorts(nodes *v1.NodeList, sp v1.ServicePort, pod *v1.Pod) {
internalAddrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
externalAddrs := e2enode.CollectAddresses(nodes, v1.NodeExternalIP)
for _, internalAddr := range internalAddrs {
testEndpointReachability(internalAddr, sp.NodePort, sp.Protocol, pod)
}
for _, externalAddr := range externalAddrs {
testEndpointReachability(externalAddr, sp.NodePort, sp.Protocol, pod)
}
}
// testEndpointReachability tests reachability to endpoints (i.e. IP, ServiceName) and ports. Test request is initiated from specified execPod.
// TCP and UDP protocol based service are supported at this moment
// TODO: add support to test SCTP Protocol based services.
func testEndpointReachability(endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod) {
ep := net.JoinHostPort(endpoint, strconv.Itoa(int(port)))
cmd := ""
switch protocol {
case v1.ProtocolTCP:
cmd = fmt.Sprintf("nc -z -t -w 2 %s %v", endpoint, port)
case v1.ProtocolUDP:
cmd = fmt.Sprintf("nc -z -u -w 2 %s %v", endpoint, port)
default:
e2elog.Failf("Service reachablity check is not supported for %v", protocol)
}
if cmd != "" {
_, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
framework.ExpectNoError(err, "Service is not reachable on following endpoint %s over %s protocol", ep, protocol)
}
}
// checkClusterIPServiceReachability ensures that service of type ClusterIP is reachable over
// - ServiceName:ServicePort, ClusterIP:ServicePort
func (j *TestJig) checkClusterIPServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) {
clusterIP := svc.Spec.ClusterIP
servicePorts := svc.Spec.Ports
j.WaitForAvailableEndpoint(namespace, svc.Name, ServiceEndpointsTimeout)
for _, servicePort := range servicePorts {
testReachabilityOverServiceName(svc.Name, servicePort, pod)
testReachabilityOverClusterIP(clusterIP, servicePort, pod)
}
}
// checkNodePortServiceReachability ensures that service of type nodePort are reachable
// - Internal clients should be reachable to service over -
// ServiceName:ServicePort, ClusterIP:ServicePort and NodeInternalIPs:NodePort
// - External clients should be reachable to service over -
// NodePublicIPs:NodePort
func (j *TestJig) checkNodePortServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) {
clusterIP := svc.Spec.ClusterIP
servicePorts := svc.Spec.Ports
// Consider only 2 nodes for testing
nodes := j.GetNodes(2)
j.WaitForAvailableEndpoint(namespace, svc.Name, ServiceEndpointsTimeout)
for _, servicePort := range servicePorts {
testReachabilityOverServiceName(svc.Name, servicePort, pod)
testReachabilityOverClusterIP(clusterIP, servicePort, pod)
testReachabilityOverNodePorts(nodes, servicePort, pod)
}
}
// checkExternalServiceReachability ensures service of type externalName resolves to IP address and no fake externalName is set
// FQDN of kubernetes is used as externalName(for air tight platforms).
func (j *TestJig) checkExternalServiceReachability(svc *v1.Service, pod *v1.Pod) {
// Service must resolve to IP
cmd := fmt.Sprintf("nslookup %s", svc.Name)
_, err := framework.RunHostCmd(pod.Namespace, pod.Name, cmd)
framework.ExpectNoError(err, "ExternalName service must resolve to IP")
}
// CheckServiceReachability ensures that request are served by the services. Only supports Services with type ClusterIP, NodePort and ExternalName.
func (j *TestJig) CheckServiceReachability(namespace string, svc *v1.Service, pod *v1.Pod) {
svcType := svc.Spec.Type
j.SanityCheckService(svc, svcType)
switch svcType {
case v1.ServiceTypeClusterIP:
j.checkClusterIPServiceReachability(namespace, svc, pod)
case v1.ServiceTypeNodePort:
j.checkNodePortServiceReachability(namespace, svc, pod)
case v1.ServiceTypeExternalName:
j.checkExternalServiceReachability(svc, pod)
default:
e2elog.Failf("Unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type.", svcType, svc.Name)
}
}
// LaunchNetexecPodOnNode launches a netexec pod on the given node.
func (j *TestJig) LaunchNetexecPodOnNode(f *framework.Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) {
e2elog.Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name)
@ -866,6 +1031,23 @@ func (j *TestJig) TestHTTPHealthCheckNodePort(host string, port int, request str
return nil
}
// CreateServicePods creates a replication controller with the label same as service
func (j *TestJig) CreateServicePods(c clientset.Interface, ns string, replica int) {
config := testutils.RCConfig{
Client: c,
Name: j.Name,
Image: framework.ServeHostnameImage,
Command: []string{"/agnhost", "serve-hostname"},
Namespace: ns,
Labels: j.Labels,
PollInterval: 3 * time.Second,
Timeout: framework.PodReadyBeforeTimeout,
Replicas: replica,
}
err := framework.RunRC(config)
framework.ExpectNoError(err, "Replica must be created")
}
// CheckAffinity function tests whether the service affinity works as expected.
// If affinity is expected, the test will return true once affinityConfirmCount
// number of same response observed in a row. If affinity is not expected, the
@ -873,7 +1055,7 @@ func (j *TestJig) TestHTTPHealthCheckNodePort(host string, port int, request str
// return false only in case of unexpected errors.
func (j *TestJig) CheckAffinity(execPod *v1.Pod, targetIP string, targetPort int, shouldHold bool) bool {
targetIPPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIPPort)
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, targetIPPort)
timeout := TestTimeout
if execPod == nil {
timeout = LoadBalancerPollTimeout
@ -957,6 +1139,55 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re
Transport: tr,
Timeout: timeout,
}
return client.Get(url)
}
// CreatePausePodDeployment creates a deployment for agnhost-pause pod running in different nodes
func (j *TestJig) CreatePausePodDeployment(name, ns string, replica int32) *appsv1.Deployment {
// terminationGracePeriod is set to 0 to reduce deployment deletion time for infinitely running pause pod.
terminationGracePeriod := int64(0)
pauseDeployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: PauseDeploymentLabels,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replica,
Selector: &metav1.LabelSelector{
MatchLabels: PauseDeploymentLabels,
},
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType,
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: PauseDeploymentLabels,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: &terminationGracePeriod,
Affinity: &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{MatchLabels: PauseDeploymentLabels},
TopologyKey: "kubernetes.io/hostname",
Namespaces: []string{ns},
},
},
},
},
Containers: []v1.Container{
{
Name: "agnhost-pause",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Args: []string{"pause"},
},
},
},
},
},
}
deployment, err := j.Client.AppsV1().Deployments(ns).Create(pauseDeployment)
framework.ExpectNoError(err, "Error in creating deployment for pause pod")
return deployment
}

View File

@ -73,7 +73,7 @@ func testAgent(f *framework.Framework, kubeClient clientset.Interface) {
}
// Create test pod with unique name.
e2epod.CreateExecPodOrFail(kubeClient, f.Namespace.Name, uniqueContainerName, func(pod *v1.Pod) {
_ = e2epod.CreateExecPodOrFail(kubeClient, f.Namespace.Name, uniqueContainerName, func(pod *v1.Pod) {
pod.Spec.Containers[0].Name = uniqueContainerName
})
defer kubeClient.CoreV1().Pods(f.Namespace.Name).Delete(uniqueContainerName, &metav1.DeleteOptions{})

View File

@ -38,6 +38,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/test/e2e/framework"
e2edeploy "k8s.io/kubernetes/test/e2e/framework/deployment"
e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -292,38 +293,53 @@ var _ = SIGDescribe("Services", func() {
serviceIP := tcpService.Spec.ClusterIP
e2elog.Logf("sourceip-test cluster ip: %s", serviceIP)
ginkgo.By("Picking multiple nodes")
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
if len(nodes.Items) == 1 {
framework.Skipf("The test requires two Ready nodes on %s, but found just one.", framework.TestContext.Provider)
ginkgo.By("Picking 2 Nodes to test whether source IP is preserved or not")
nodes := jig.GetNodes(2)
nodeCounts := len(nodes.Items)
if nodeCounts < 2 {
framework.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
}
node1 := nodes.Items[0]
node2 := nodes.Items[1]
ginkgo.By("Creating a webserver pod be part of the TCP service which echoes back source ip")
serverPodName := "echoserver-sourceip"
jig.LaunchEchoserverPodOnNode(f, node1.Name, serverPodName)
jig.LaunchEchoserverPodOnNode(f, "", serverPodName)
defer func() {
e2elog.Logf("Cleaning up the echo server pod")
err := cs.CoreV1().Pods(ns).Delete(serverPodName, nil)
framework.ExpectNoError(err, "failed to delete pod: %s on node: %s", serverPodName, node1.Name)
framework.ExpectNoError(err, "failed to delete pod: %s on node", serverPodName)
}()
// Waiting for service to expose endpoint.
err := e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{serverPodName: {servicePort}})
framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns)
ginkgo.By("Retrieve sourceip from a pod on the same node")
sourceIP1, execPodIP1 := execSourceipTest(f, cs, ns, node1.Name, serviceIP, servicePort)
ginkgo.By("Verifying the preserved source ip")
framework.ExpectEqual(sourceIP1, execPodIP1)
ginkgo.By("Creating pause pod deployment")
deployment := jig.CreatePausePodDeployment("pause-pod", ns, int32(nodeCounts))
ginkgo.By("Retrieve sourceip from a pod on a different node")
sourceIP2, execPodIP2 := execSourceipTest(f, cs, ns, node2.Name, serviceIP, servicePort)
defer func() {
e2elog.Logf("Deleting deployment")
err = cs.AppsV1().Deployments(ns).Delete(deployment.Name, &metav1.DeleteOptions{})
framework.ExpectNoError(err, "Failed to delete deployment %s", deployment.Name)
}()
framework.ExpectNoError(e2edeploy.WaitForDeploymentComplete(cs, deployment), "Failed to complete pause pod deployment")
deployment, err = cs.AppsV1().Deployments(ns).Get(deployment.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "Error in retriving pause pod deployment")
labelSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
pausePods, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{LabelSelector: labelSelector.String()})
framework.ExpectNoError(err, "Error in listing pods associated with pause pod deployments")
gomega.Expect(pausePods.Items[0].Spec.NodeName).ToNot(gomega.Equal(pausePods.Items[1].Spec.NodeName))
serviceAddress := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
for _, pausePod := range pausePods.Items {
sourceIP, execPodIP := execSourceipTest(pausePod, serviceAddress)
ginkgo.By("Verifying the preserved source ip")
framework.ExpectEqual(sourceIP2, execPodIP2)
framework.ExpectEqual(sourceIP, execPodIP)
}
})
ginkgo.It("should be able to up and down services", func() {
@ -499,36 +515,18 @@ var _ = SIGDescribe("Services", func() {
ns := f.Namespace.Name
jig := e2eservice.NewTestJig(cs, serviceName)
nodeIP, err := e2enode.PickIP(jig.Client) // for later
if err != nil {
e2elog.Logf("Unexpected error occurred: %v", err)
}
// TODO: write a wrapper for ExpectNoErrorWithOffset()
framework.ExpectNoErrorWithOffset(0, err)
ginkgo.By("creating service " + serviceName + " with type=NodePort in namespace " + ns)
service := jig.CreateTCPServiceOrFail(ns, func(svc *v1.Service) {
nodePortService := jig.CreateTCPServiceOrFail(ns, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
})
jig.SanityCheckService(service, v1.ServiceTypeNodePort)
nodePort := int(service.Spec.Ports[0].NodePort)
ginkgo.By("creating pod to be part of service " + serviceName)
jig.RunOrFail(ns, nil)
ginkgo.By("hitting the pod through the service's NodePort")
jig.TestReachableHTTP(nodeIP, nodePort, e2eservice.KubeProxyLagTimeout)
ginkgo.By("verifying the node port is locked")
hostExec := e2epod.LaunchHostExecPod(f.ClientSet, f.Namespace.Name, "hostexec")
// Even if the node-ip:node-port check above passed, this hostexec pod
// might fall on a node with a laggy kube-proxy.
cmd := fmt.Sprintf(`for i in $(seq 1 300); do if ss -ant46 'sport = :%d' | grep ^LISTEN; then exit 0; fi; sleep 1; done; exit 1`, nodePort)
stdout, err := framework.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
if err != nil {
e2elog.Failf("expected node port %d to be in use, stdout: %v. err: %v", nodePort, stdout, err)
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(9376)},
}
})
jig.CreateServicePods(cs, ns, 2)
execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil)
jig.CheckServiceReachability(ns, nodePortService, execPod)
})
// TODO: Get rid of [DisabledForLargeClusters] tag when issue #56138 is fixed.
ginkgo.It("should be able to change the type and ports of a service [Slow] [DisabledForLargeClusters]", func() {
@ -963,15 +961,19 @@ var _ = SIGDescribe("Services", func() {
framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
}()
jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName)
ginkgo.By("changing the ExternalName service to type=ClusterIP")
clusterIPService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeClusterIP
s.Spec.ExternalName = ""
s.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(9376)},
}
})
jig.SanityCheckService(clusterIPService, v1.ServiceTypeClusterIP)
jig.CreateServicePods(cs, ns, 2)
execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil)
jig.CheckServiceReachability(ns, clusterIPService, execPod)
})
ginkgo.It("should be able to change the type from ExternalName to NodePort", func() {
@ -987,15 +989,19 @@ var _ = SIGDescribe("Services", func() {
framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
}()
jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName)
ginkgo.By("changing the ExternalName service to type=NodePort")
nodePortService := jig.UpdateServiceOrFail(ns, externalNameService.Name, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeNodePort
s.Spec.ExternalName = ""
s.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP},
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(9376)},
}
})
jig.SanityCheckService(nodePortService, v1.ServiceTypeNodePort)
jig.CreateServicePods(cs, ns, 2)
execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil)
jig.CheckServiceReachability(ns, nodePortService, execPod)
})
ginkgo.It("should be able to change the type from ClusterIP to ExternalName", func() {
@ -1011,13 +1017,22 @@ var _ = SIGDescribe("Services", func() {
framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
}()
jig.SanityCheckService(clusterIPService, v1.ServiceTypeClusterIP)
ginkgo.By("Creating active service to test reachability when its FQDN is referred as externalName for another service")
externalServiceName := "externalsvc"
externalServiceFQDN := createAndGetExternalServiceFQDN(cs, ns, externalServiceName)
defer func() {
framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, externalServiceName))
}()
ginkgo.By("changing the ClusterIP service to type=ExternalName")
externalNameService := jig.UpdateServiceOrFail(ns, clusterIPService.Name, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeExternalName
s.Spec.ExternalName = "foo.example.com"
s.Spec.ExternalName = externalServiceFQDN
s.Spec.ClusterIP = ""
})
jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName)
execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil)
jig.CheckServiceReachability(ns, externalNameService, execPod)
})
ginkgo.It("should be able to change the type from NodePort to ExternalName", func() {
@ -1035,14 +1050,24 @@ var _ = SIGDescribe("Services", func() {
framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
}()
jig.SanityCheckService(nodePortService, v1.ServiceTypeNodePort)
ginkgo.By("Creating active service to test reachability when its FQDN is referred as externalName for another service")
externalServiceName := "externalsvc"
externalServiceFQDN := createAndGetExternalServiceFQDN(cs, ns, externalServiceName)
defer func() {
framework.ExpectNoError(e2eservice.StopServeHostnameService(f.ClientSet, ns, externalServiceName))
}()
ginkgo.By("changing the NodePort service to type=ExternalName")
externalNameService := jig.UpdateServiceOrFail(ns, nodePortService.Name, func(s *v1.Service) {
s.Spec.Type = v1.ServiceTypeExternalName
s.Spec.ExternalName = "foo.example.com"
s.Spec.ExternalName = externalServiceFQDN
s.Spec.ClusterIP = ""
s.Spec.Ports[0].NodePort = 0
})
jig.SanityCheckService(externalNameService, v1.ServiceTypeExternalName)
execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil)
jig.CheckServiceReachability(ns, externalNameService, execPod)
})
ginkgo.It("should use same NodePort with same port but different protocols", func() {
@ -1345,8 +1370,9 @@ var _ = SIGDescribe("Services", func() {
svcName := fmt.Sprintf("%v.%v.svc.%v", serviceName, f.Namespace.Name, framework.TestContext.ClusterDNSDomain)
ginkgo.By("Waiting for endpoints of Service with DNS name " + svcName)
execPodName := e2epod.CreateExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-", nil)
cmd := fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port)
execPod := e2epod.CreateExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-", nil)
execPodName := execPod.Name
cmd := fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/", svcName, port)
var stdout string
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
var err error
@ -1370,7 +1396,7 @@ var _ = SIGDescribe("Services", func() {
framework.ExpectNoError(err)
ginkgo.By("Check if pod is unreachable")
cmd = fmt.Sprintf("wget -qO- -T 2 http://%s:%d/; test \"$?\" -eq \"1\"", svcName, port)
cmd = fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/; test \"$?\" -ne \"0\"", svcName, port)
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
var err error
stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
@ -1390,7 +1416,7 @@ var _ = SIGDescribe("Services", func() {
framework.ExpectNoError(err)
ginkgo.By("Check if terminating pod is available through service")
cmd = fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port)
cmd = fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/", svcName, port)
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
var err error
stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
@ -1441,13 +1467,8 @@ var _ = SIGDescribe("Services", func() {
ginkgo.By("Prepare allow source ips")
// prepare the exec pods
// acceptPod are allowed to access the loadbalancer
acceptPodName := e2epod.CreateExecPodOrFail(cs, namespace, "execpod-accept", nil)
dropPodName := e2epod.CreateExecPodOrFail(cs, namespace, "execpod-drop", nil)
acceptPod, err := cs.CoreV1().Pods(namespace).Get(acceptPodName, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to fetch pod: %s in namespace: %s", acceptPodName, namespace)
dropPod, err := cs.CoreV1().Pods(namespace).Get(dropPodName, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to fetch pod: %s in namespace: %s", dropPodName, namespace)
acceptPod := e2epod.CreateExecPodOrFail(cs, namespace, "execpod-accept", nil)
dropPod := e2epod.CreateExecPodOrFail(cs, namespace, "execpod-drop", nil)
ginkgo.By("creating a pod to be part of the service " + serviceName)
// This container is an nginx container listening on port 80
@ -1465,7 +1486,7 @@ var _ = SIGDescribe("Services", func() {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.LoadBalancerSourceRanges = nil
})
err = cs.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil)
err := cs.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil)
framework.ExpectNoError(err)
}()
@ -1479,23 +1500,23 @@ var _ = SIGDescribe("Services", func() {
svcIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
// Wait longer as this is our first request after creation. We can't check using a separate method,
// because the LB should only be reachable from the "accept" pod
framework.CheckReachabilityFromPod(true, loadBalancerLagTimeout, namespace, acceptPodName, svcIP)
framework.CheckReachabilityFromPod(false, normalReachabilityTimeout, namespace, dropPodName, svcIP)
framework.CheckReachabilityFromPod(true, loadBalancerLagTimeout, namespace, acceptPod.Name, svcIP)
framework.CheckReachabilityFromPod(false, normalReachabilityTimeout, namespace, dropPod.Name, svcIP)
ginkgo.By("Update service LoadBalancerSourceRange and check reachability")
jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
// only allow access from dropPod
svc.Spec.LoadBalancerSourceRanges = []string{dropPod.Status.PodIP + "/32"}
})
framework.CheckReachabilityFromPod(false, normalReachabilityTimeout, namespace, acceptPodName, svcIP)
framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, dropPodName, svcIP)
framework.CheckReachabilityFromPod(false, normalReachabilityTimeout, namespace, acceptPod.Name, svcIP)
framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, dropPod.Name, svcIP)
ginkgo.By("Delete LoadBalancerSourceRange field and check reachability")
jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
svc.Spec.LoadBalancerSourceRanges = nil
})
framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, acceptPodName, svcIP)
framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, dropPodName, svcIP)
framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, acceptPod.Name, svcIP)
framework.CheckReachabilityFromPod(true, normalReachabilityTimeout, namespace, dropPod.Name, svcIP)
})
// TODO: Get rid of [DisabledForLargeClusters] tag when issue #56138 is fixed.
@ -1843,11 +1864,9 @@ var _ = SIGDescribe("Services", func() {
podName := "execpod-noendpoints"
ginkgo.By(fmt.Sprintf("creating %v on node %v", podName, nodeName))
execPodName := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) {
execPod := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) {
pod.Spec.NodeName = nodeName
})
execPod, err := f.ClientSet.CoreV1().Pods(namespace).Get(execPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
serviceAddress := net.JoinHostPort(serviceName, strconv.Itoa(port))
e2elog.Logf("waiting up to %v to connect to %v", e2eservice.KubeProxyEndpointLagTimeout, serviceAddress)
@ -2161,18 +2180,16 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() {
podName := "execpod-sourceip"
ginkgo.By(fmt.Sprintf("Creating %v on node %v", podName, nodeName))
execPodName := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) {
execPod := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) {
pod.Spec.NodeName = nodeName
})
defer func() {
err := cs.CoreV1().Pods(namespace).Delete(execPodName, nil)
framework.ExpectNoError(err, "failed to delete pod: %s", execPodName)
err := cs.CoreV1().Pods(namespace).Delete(execPod.Name, nil)
framework.ExpectNoError(err, "failed to delete pod: %s", execPod.Name)
}()
execPod, err := f.ClientSet.CoreV1().Pods(namespace).Get(execPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
e2elog.Logf("Waiting up to %v wget %v", e2eservice.KubeProxyLagTimeout, path)
cmd := fmt.Sprintf(`wget -T 30 -qO- %v`, path)
e2elog.Logf("Waiting up to %v curl %v", e2eservice.KubeProxyLagTimeout, path)
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %v`, path)
var srcIP string
ginkgo.By(fmt.Sprintf("Hitting external lb %v from pod %v on node %v", ingressIP, podName, nodeName))
@ -2298,31 +2315,20 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() {
})
})
func execSourceipTest(f *framework.Framework, c clientset.Interface, ns, nodeName, serviceIP string, servicePort int) (string, string) {
e2elog.Logf("Creating an exec pod on node %v", nodeName)
execPodName := e2epod.CreateExecPodOrFail(f.ClientSet, ns, fmt.Sprintf("execpod-sourceip-%s", nodeName), func(pod *v1.Pod) {
pod.Spec.NodeName = nodeName
})
defer func() {
e2elog.Logf("Cleaning up the exec pod")
err := c.CoreV1().Pods(ns).Delete(execPodName, nil)
framework.ExpectNoError(err, "failed to delete pod: %s", execPodName)
}()
execPod, err := f.ClientSet.CoreV1().Pods(ns).Get(execPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
func execSourceipTest(pausePod v1.Pod, serviceAddress string) (string, string) {
var err error
var stdout string
serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
timeout := 2 * time.Minute
e2elog.Logf("Waiting up to %v wget %s", timeout, serviceIPPort)
cmd := fmt.Sprintf(`wget -T 30 -qO- %s | grep client_address`, serviceIPPort)
e2elog.Logf("Waiting up to %v to get response from %s", timeout, serviceAddress)
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 30 %s | grep client_address`, serviceAddress)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
stdout, err = framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
stdout, err = framework.RunHostCmd(pausePod.Namespace, pausePod.Name, cmd)
if err != nil {
e2elog.Logf("got err: %v, retry until timeout", err)
continue
}
// Need to check output because wget -q might omit the error.
// Need to check output because it might omit in case of error.
if strings.TrimSpace(stdout) == "" {
e2elog.Logf("got empty stdout, retry until timeout")
continue
@ -2339,7 +2345,7 @@ func execSourceipTest(f *framework.Framework, c clientset.Interface, ns, nodeNam
// ginkgo.Fail the test if output format is unexpected.
e2elog.Failf("exec pod returned unexpected stdout format: [%v]\n", stdout)
}
return execPod.Status.PodIP, outputs[1]
return pausePod.Status.PodIP, outputs[1]
}
func execAffinityTestForNonLBServiceWithTransition(f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
@ -2380,14 +2386,14 @@ func execAffinityTestForNonLBServiceWithOptionalTransition(f *framework.Framewor
svcIP = svc.Spec.ClusterIP
}
execPodName := e2epod.CreateExecPodOrFail(cs, ns, "execpod-affinity", nil)
execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod-affinity", nil)
defer func() {
e2elog.Logf("Cleaning up the exec pod")
err := cs.CoreV1().Pods(ns).Delete(execPodName, nil)
framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPodName, ns)
err := cs.CoreV1().Pods(ns).Delete(execPod.Name, nil)
framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPod.Name, ns)
}()
execPod, err := cs.CoreV1().Pods(ns).Get(execPodName, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to fetch pod: %s in namespace: %s", execPodName, ns)
jig.CheckServiceReachability(ns, svc, execPod)
framework.ExpectNoError(err, "failed to fetch pod: %s in namespace: %s", execPod.Name, ns)
if !isTransitionTest {
gomega.Expect(jig.CheckAffinity(execPod, svcIP, servicePort, true)).To(gomega.BeTrue())
@ -2451,3 +2457,9 @@ func execAffinityTestForLBServiceWithOptionalTransition(f *framework.Framework,
gomega.Expect(jig.CheckAffinity(nil, ingressIP, port, true)).To(gomega.BeTrue())
}
}
func createAndGetExternalServiceFQDN(cs clientset.Interface, ns, serviceName string) string {
_, _, err := e2eservice.StartServeHostnameService(cs, getServeHostnameService(serviceName), ns, 2)
framework.ExpectNoError(err, "Expected Service %s to be running", serviceName)
return fmt.Sprintf("%s.%s.svc.%s", serviceName, ns, framework.TestContext.ClusterDNSDomain)
}