diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index 168991dbe82..1748c3f881b 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -95,6 +95,10 @@ const ( // GCPMaxInstancesInInstanceGroup is the maximum number of instances supported in // one instance group on GCP. GCPMaxInstancesInInstanceGroup = 2000 + + // AffinityConfirmCount is the number of needed continuous requests to confirm that + // affinity is enabled. + AffinityConfirmCount = 15 ) // This should match whatever the default/configured range is @@ -1207,26 +1211,13 @@ func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName strin Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout) } -// StartServeHostnameService creates a replication controller that serves its hostname and a service on top of it. -func StartServeHostnameService(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, port, replicas int) ([]string, string, error) { +// StartServeHostnameService creates a replication controller that serves its +// hostname and a service on top of it. +func StartServeHostnameService(c clientset.Interface, internalClient internalclientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) { podNames := make([]string, replicas) - + name := svc.ObjectMeta.Name By("creating service " + name + " in namespace " + ns) - _, err := c.CoreV1().Services(ns).Create(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{{ - Port: int32(port), - TargetPort: intstr.FromInt(9376), - Protocol: "TCP", - }}, - Selector: map[string]string{ - "name": name, - }, - }, - }) + _, err := c.CoreV1().Services(ns).Create(svc) if err != nil { return podNames, "", err } @@ -1465,3 +1456,97 @@ func GetServiceLoadBalancerCreationTimeout(cs clientset.Interface) time.Duration } return LoadBalancerCreateTimeoutDefault } + +// affinityTracker tracks the destination of a request for the affinity tests. +type affinityTracker struct { + hostTrace []string +} + +// Record the response going to a given host. +func (at *affinityTracker) recordHost(host string) { + at.hostTrace = append(at.hostTrace, host) +} + +// Check that we got a constant count requests going to the same host. +func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) { + fulfilled = (len(at.hostTrace) >= count) + if len(at.hostTrace) == 0 { + return fulfilled, true + } + last := at.hostTrace[0:] + if len(at.hostTrace)-count >= 0 { + last = at.hostTrace[len(at.hostTrace)-count:] + } + host := at.hostTrace[len(at.hostTrace)-1] + for _, h := range last { + if h != host { + return fulfilled, false + } + } + return fulfilled, true +} + +func checkAffinityFailed(tracker affinityTracker, err string) { + Logf("%v", tracker.hostTrace) + Failf(err) +} + +// CheckAffinity function tests whether the service affinity works as expected. +// If affinity is expected and transitionState is true, the test will +// return true once affinityConfirmCount number of same response observed in a +// row. If affinity is not expected, the test will keep observe until different +// responses observed. The function will return false only when no expected +// responses observed before timeout. If transitionState is false, the test will +// fail once different host is given if shouldHold is true. +func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, targetPort int, shouldHold, transitionState bool) bool { + targetIpPort := net.JoinHostPort(targetIp, strconv.Itoa(targetPort)) + cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIpPort) + timeout := ServiceTestTimeout + if execPod == nil { + timeout = LoadBalancerPollTimeout + } + var tracker affinityTracker + if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) { + if execPod != nil { + if stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil { + Logf("Failed to get response from %s. Retry until timeout", targetIpPort) + return false, nil + } else { + tracker.recordHost(stdout) + } + } else { + rawResponse := jig.GetHTTPContent(targetIp, targetPort, timeout, "") + tracker.recordHost(rawResponse.String()) + } + trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount) + if !shouldHold && !affinityHolds { + return true, nil + } + if shouldHold { + if !transitionState && !affinityHolds { + return true, fmt.Errorf("Affintity should hold but didn't.") + } + if trackerFulfilled && affinityHolds { + return true, nil + } + } + return false, nil + }); pollErr != nil { + trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount) + if pollErr != wait.ErrWaitTimeout { + checkAffinityFailed(tracker, pollErr.Error()) + return false + } else { + if !trackerFulfilled { + checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIpPort)) + } + if shouldHold { + checkAffinityFailed(tracker, "Affintity should hold but didn't.") + } else { + checkAffinityFailed(tracker, "Affintity shouldn't hold but did.") + } + return true + } + } + return true +} diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index eaecf2b12c6..4ee76c2a7c5 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -43,6 +43,37 @@ import ( . "github.com/onsi/gomega" ) +// TODO(yankaiz): Move constants and default settings to service_util.go. +const ( + defaultServeHostnameServicePort = 80 + defaultServeHostnameServiceName = "svc-hostname" +) + +var ( + defaultServeHostnameService = v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultServeHostnameServiceName, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Port: int32(defaultServeHostnameServicePort), + TargetPort: intstr.FromInt(9376), + Protocol: "TCP", + }}, + Selector: map[string]string{ + "name": defaultServeHostnameServiceName, + }, + }, + } +) + +func getServeHostnameService(name string) *v1.Service { + svc := defaultServeHostnameService.DeepCopy() + svc.ObjectMeta.Name = name + svc.Spec.Selector["name"] = name + return svc +} + var _ = SIGDescribe("Services", func() { f := framework.NewDefaultFramework("services") @@ -285,13 +316,13 @@ var _ = SIGDescribe("Services", func() { framework.SkipUnlessSSHKeyPresent() ns := f.Namespace.Name - numPods, servicePort := 3, 80 + numPods, servicePort := 3, defaultServeHostnameServicePort By("creating service1 in namespace " + ns) - podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service1", servicePort, numPods) + podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service1"), ns, numPods) Expect(err).NotTo(HaveOccurred()) By("creating service2 in namespace " + ns) - podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service2", servicePort, numPods) + podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service2"), ns, numPods) Expect(err).NotTo(HaveOccurred()) hosts, err := framework.NodeSSHHosts(cs) @@ -318,7 +349,7 @@ var _ = SIGDescribe("Services", func() { // Start another service and verify both are up. By("creating service3 in namespace " + ns) - podNames3, svc3IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service3", servicePort, numPods) + podNames3, svc3IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service3"), ns, numPods) Expect(err).NotTo(HaveOccurred()) if svc2IP == svc3IP { @@ -337,7 +368,7 @@ var _ = SIGDescribe("Services", func() { framework.SkipUnlessProviderIs("gce", "gke") ns := f.Namespace.Name - numPods, servicePort := 3, 80 + numPods, servicePort := 3, defaultServeHostnameServicePort svc1 := "service1" svc2 := "service2" @@ -345,13 +376,13 @@ var _ = SIGDescribe("Services", func() { defer func() { framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, svc1)) }() - podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, svc1, servicePort, numPods) + podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService(svc1), ns, numPods) Expect(err).NotTo(HaveOccurred()) defer func() { framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, svc2)) }() - podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, svc2, servicePort, numPods) + podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService(svc2), ns, numPods) Expect(err).NotTo(HaveOccurred()) if svc1IP == svc2IP { @@ -398,7 +429,7 @@ var _ = SIGDescribe("Services", func() { defer func() { framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service1")) }() - podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service1", servicePort, numPods) + podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service1"), ns, numPods) Expect(err).NotTo(HaveOccurred()) hosts, err := framework.NodeSSHHosts(cs) @@ -425,7 +456,7 @@ var _ = SIGDescribe("Services", func() { defer func() { framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service2")) }() - podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service2", servicePort, numPods) + podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service2"), ns, numPods) Expect(err).NotTo(HaveOccurred()) if svc1IP == svc2IP { @@ -1513,6 +1544,58 @@ var _ = SIGDescribe("Services", func() { By("switching to ClusterIP type to destroy loadbalancer") jig.ChangeServiceType(svc.Namespace, svc.Name, v1.ServiceTypeClusterIP, createTimeout) }) + + It("should have session affinity work for service with type clusterIP", func() { + svc := getServeHostnameService("service") + svc.Spec.Type = v1.ServiceTypeClusterIP + execAffinityTestForNonLBService(f, cs, svc, false) + }) + + It("should be able to switch session affinity for service with type clusterIP", func() { + svc := getServeHostnameService("service") + svc.Spec.Type = v1.ServiceTypeClusterIP + execAffinityTestForNonLBService(f, cs, svc, true) + }) + + It("should have session affinity work for NodePort service", func() { + svc := getServeHostnameService("service") + svc.Spec.Type = v1.ServiceTypeNodePort + execAffinityTestForNonLBService(f, cs, svc, false) + }) + + It("should be able to switch session affinity for NodePort service", func() { + svc := getServeHostnameService("service") + svc.Spec.Type = v1.ServiceTypeNodePort + execAffinityTestForNonLBService(f, cs, svc, true) + }) + + It("should have session affinity work for LoadBalancer service with ESIPP on [Slow]", func() { + svc := getServeHostnameService("service") + svc.Spec.Type = v1.ServiceTypeLoadBalancer + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + execAffinityTestForLBService(f, cs, svc, false) + }) + + It("should be able to switch session affinity for LoadBalancer service with ESIPP on [Slow]", func() { + svc := getServeHostnameService("service") + svc.Spec.Type = v1.ServiceTypeLoadBalancer + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + execAffinityTestForLBService(f, cs, svc, true) + }) + + It("should have session affinity work for LoadBalancer service with ESIPP off [Slow]", func() { + svc := getServeHostnameService("service") + svc.Spec.Type = v1.ServiceTypeLoadBalancer + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster + execAffinityTestForLBService(f, cs, svc, false) + }) + + It("should be able to switch session affinity for LoadBalancer service with ESIPP off [Slow]", func() { + svc := getServeHostnameService("service") + svc.Spec.Type = v1.ServiceTypeLoadBalancer + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster + execAffinityTestForLBService(f, cs, svc, true) + }) }) // TODO: Get rid of [DisabledForLargeClusters] tag when issue #52495 is fixed. @@ -1867,3 +1950,92 @@ func execSourceipTest(f *framework.Framework, c clientset.Interface, ns, nodeNam } return execPod.Status.PodIP, outputs[1] } + +// execAffinityTestForNonLBService is a helper function that wrap the logic of +// affinity test for non-load-balancer services. Session afinity will be +// enabled when the service is created. If parameter isTransitionTest is true, +// session affinity will be switched off/on and test if the service converges +// to a stable affinity state. +func execAffinityTestForNonLBService(f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) { + ns := f.Namespace.Name + numPods, servicePort, serviceName := 3, defaultServeHostnameServicePort, svc.ObjectMeta.Name + By("creating service in namespace " + ns) + serviceType := svc.Spec.Type + svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP + _, _, err := framework.StartServeHostnameService(cs, f.InternalClientset, svc, ns, numPods) + Expect(err).NotTo(HaveOccurred()) + defer func() { + framework.StopServeHostnameService(cs, f.InternalClientset, f.ScalesGetter, ns, serviceName) + }() + jig := framework.NewServiceTestJig(cs, serviceName) + svc, err = jig.Client.CoreV1().Services(ns).Get(serviceName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + var svcIp string + if serviceType == v1.ServiceTypeNodePort { + nodes := framework.GetReadySchedulableNodesOrDie(cs) + addrs := framework.CollectAddresses(nodes, v1.NodeInternalIP) + Expect(len(addrs)).To(BeNumerically(">", 0), "Failed to get Node internal IP") + svcIp = addrs[0] + servicePort = int(svc.Spec.Ports[0].NodePort) + } else { + svcIp = svc.Spec.ClusterIP + } + + execPodName := framework.CreateExecPodOrFail(cs, ns, "execpod-affinity", nil) + defer func() { + framework.Logf("Cleaning up the exec pod") + err := cs.CoreV1().Pods(ns).Delete(execPodName, nil) + Expect(err).NotTo(HaveOccurred()) + }() + execPod, err := cs.CoreV1().Pods(ns).Get(execPodName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + if !isTransitionTest { + Expect(framework.CheckAffinity(jig, execPod, svcIp, servicePort, true, false)).To(BeTrue()) + } + if isTransitionTest { + svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { + svc.Spec.SessionAffinity = v1.ServiceAffinityNone + }) + Expect(framework.CheckAffinity(jig, execPod, svcIp, servicePort, false, true)).To(BeTrue()) + svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { + svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP + }) + Expect(framework.CheckAffinity(jig, execPod, svcIp, servicePort, true, true)).To(BeTrue()) + } +} + +// execAffinityTestForLBService is a helper function that wrap the logic of +// affinity test for load balancer services, similar to +// execAffinityTestForNonLBService. +func execAffinityTestForLBService(f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) { + numPods, ns, serviceName := 3, f.Namespace.Name, svc.ObjectMeta.Name + + By("creating service in namespace " + ns) + svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP + _, _, err := framework.StartServeHostnameService(cs, f.InternalClientset, svc, ns, numPods) + Expect(err).NotTo(HaveOccurred()) + jig := framework.NewServiceTestJig(cs, serviceName) + By("waiting for loadbalancer for service " + ns + "/" + serviceName) + svc = jig.WaitForLoadBalancerOrFail(ns, serviceName, framework.LoadBalancerCreateTimeoutDefault) + jig.SanityCheckService(svc, v1.ServiceTypeLoadBalancer) + defer func() { + framework.StopServeHostnameService(cs, f.InternalClientset, f.ScalesGetter, ns, serviceName) + }() + ingressIP := framework.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) + port := int(svc.Spec.Ports[0].Port) + + if !isTransitionTest { + Expect(framework.CheckAffinity(jig, nil, ingressIP, port, true, false)).To(BeTrue()) + } + if isTransitionTest { + svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { + svc.Spec.SessionAffinity = v1.ServiceAffinityNone + }) + Expect(framework.CheckAffinity(jig, nil, ingressIP, port, false, true)).To(BeTrue()) + svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { + svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP + }) + Expect(framework.CheckAffinity(jig, nil, ingressIP, port, true, true)).To(BeTrue()) + } +}