Merge pull request #61188 from grayluck/affinity-test

Automatic merge from submit-queue (batch tested with PRs 61848, 61188, 56363, 61357, 61838). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Affinity test

**What this PR does / why we need it**:
Add e2e test for service session affinity, including all three types of services and externalTravicPolicy=Local.

**Which issue(s) this PR fixes**:
Fixes #55520



**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-03-28 13:52:07 -07:00 committed by GitHub
commit f92ddc5dc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 284 additions and 27 deletions

View File

@ -95,6 +95,10 @@ const (
// GCPMaxInstancesInInstanceGroup is the maximum number of instances supported in
// one instance group on GCP.
GCPMaxInstancesInInstanceGroup = 2000
// AffinityConfirmCount is the number of needed continuous requests to confirm that
// affinity is enabled.
AffinityConfirmCount = 15
)
// This should match whatever the default/configured range is
@ -1207,26 +1211,13 @@ func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName strin
Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout)
}
// StartServeHostnameService creates a replication controller that serves its hostname and a service on top of it.
func StartServeHostnameService(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, port, replicas int) ([]string, string, error) {
// StartServeHostnameService creates a replication controller that serves its
// hostname and a service on top of it.
func StartServeHostnameService(c clientset.Interface, internalClient internalclientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
podNames := make([]string, replicas)
name := svc.ObjectMeta.Name
By("creating service " + name + " in namespace " + ns)
_, err := c.CoreV1().Services(ns).Create(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Port: int32(port),
TargetPort: intstr.FromInt(9376),
Protocol: "TCP",
}},
Selector: map[string]string{
"name": name,
},
},
})
_, err := c.CoreV1().Services(ns).Create(svc)
if err != nil {
return podNames, "", err
}
@ -1465,3 +1456,97 @@ func GetServiceLoadBalancerCreationTimeout(cs clientset.Interface) time.Duration
}
return LoadBalancerCreateTimeoutDefault
}
// affinityTracker tracks the destination of a request for the affinity tests.
type affinityTracker struct {
hostTrace []string
}
// Record the response going to a given host.
func (at *affinityTracker) recordHost(host string) {
at.hostTrace = append(at.hostTrace, host)
}
// Check that we got a constant count requests going to the same host.
func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) {
fulfilled = (len(at.hostTrace) >= count)
if len(at.hostTrace) == 0 {
return fulfilled, true
}
last := at.hostTrace[0:]
if len(at.hostTrace)-count >= 0 {
last = at.hostTrace[len(at.hostTrace)-count:]
}
host := at.hostTrace[len(at.hostTrace)-1]
for _, h := range last {
if h != host {
return fulfilled, false
}
}
return fulfilled, true
}
func checkAffinityFailed(tracker affinityTracker, err string) {
Logf("%v", tracker.hostTrace)
Failf(err)
}
// CheckAffinity function tests whether the service affinity works as expected.
// If affinity is expected and transitionState is true, the test will
// return true once affinityConfirmCount number of same response observed in a
// row. If affinity is not expected, the test will keep observe until different
// responses observed. The function will return false only when no expected
// responses observed before timeout. If transitionState is false, the test will
// fail once different host is given if shouldHold is true.
func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, targetPort int, shouldHold, transitionState bool) bool {
targetIpPort := net.JoinHostPort(targetIp, strconv.Itoa(targetPort))
cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIpPort)
timeout := ServiceTestTimeout
if execPod == nil {
timeout = LoadBalancerPollTimeout
}
var tracker affinityTracker
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
if execPod != nil {
if stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil {
Logf("Failed to get response from %s. Retry until timeout", targetIpPort)
return false, nil
} else {
tracker.recordHost(stdout)
}
} else {
rawResponse := jig.GetHTTPContent(targetIp, targetPort, timeout, "")
tracker.recordHost(rawResponse.String())
}
trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
if !shouldHold && !affinityHolds {
return true, nil
}
if shouldHold {
if !transitionState && !affinityHolds {
return true, fmt.Errorf("Affintity should hold but didn't.")
}
if trackerFulfilled && affinityHolds {
return true, nil
}
}
return false, nil
}); pollErr != nil {
trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount)
if pollErr != wait.ErrWaitTimeout {
checkAffinityFailed(tracker, pollErr.Error())
return false
} else {
if !trackerFulfilled {
checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIpPort))
}
if shouldHold {
checkAffinityFailed(tracker, "Affintity should hold but didn't.")
} else {
checkAffinityFailed(tracker, "Affintity shouldn't hold but did.")
}
return true
}
}
return true
}

View File

@ -43,6 +43,37 @@ import (
. "github.com/onsi/gomega"
)
// TODO(yankaiz): Move constants and default settings to service_util.go.
const (
defaultServeHostnameServicePort = 80
defaultServeHostnameServiceName = "svc-hostname"
)
var (
defaultServeHostnameService = v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: defaultServeHostnameServiceName,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Port: int32(defaultServeHostnameServicePort),
TargetPort: intstr.FromInt(9376),
Protocol: "TCP",
}},
Selector: map[string]string{
"name": defaultServeHostnameServiceName,
},
},
}
)
func getServeHostnameService(name string) *v1.Service {
svc := defaultServeHostnameService.DeepCopy()
svc.ObjectMeta.Name = name
svc.Spec.Selector["name"] = name
return svc
}
var _ = SIGDescribe("Services", func() {
f := framework.NewDefaultFramework("services")
@ -285,13 +316,13 @@ var _ = SIGDescribe("Services", func() {
framework.SkipUnlessSSHKeyPresent()
ns := f.Namespace.Name
numPods, servicePort := 3, 80
numPods, servicePort := 3, defaultServeHostnameServicePort
By("creating service1 in namespace " + ns)
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service1", servicePort, numPods)
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service1"), ns, numPods)
Expect(err).NotTo(HaveOccurred())
By("creating service2 in namespace " + ns)
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service2", servicePort, numPods)
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service2"), ns, numPods)
Expect(err).NotTo(HaveOccurred())
hosts, err := framework.NodeSSHHosts(cs)
@ -318,7 +349,7 @@ var _ = SIGDescribe("Services", func() {
// Start another service and verify both are up.
By("creating service3 in namespace " + ns)
podNames3, svc3IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service3", servicePort, numPods)
podNames3, svc3IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service3"), ns, numPods)
Expect(err).NotTo(HaveOccurred())
if svc2IP == svc3IP {
@ -337,7 +368,7 @@ var _ = SIGDescribe("Services", func() {
framework.SkipUnlessProviderIs("gce", "gke")
ns := f.Namespace.Name
numPods, servicePort := 3, 80
numPods, servicePort := 3, defaultServeHostnameServicePort
svc1 := "service1"
svc2 := "service2"
@ -345,13 +376,13 @@ var _ = SIGDescribe("Services", func() {
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, svc1))
}()
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, svc1, servicePort, numPods)
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService(svc1), ns, numPods)
Expect(err).NotTo(HaveOccurred())
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, svc2))
}()
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, svc2, servicePort, numPods)
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService(svc2), ns, numPods)
Expect(err).NotTo(HaveOccurred())
if svc1IP == svc2IP {
@ -398,7 +429,7 @@ var _ = SIGDescribe("Services", func() {
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service1"))
}()
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service1", servicePort, numPods)
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service1"), ns, numPods)
Expect(err).NotTo(HaveOccurred())
hosts, err := framework.NodeSSHHosts(cs)
@ -425,7 +456,7 @@ var _ = SIGDescribe("Services", func() {
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service2"))
}()
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service2", servicePort, numPods)
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, getServeHostnameService("service2"), ns, numPods)
Expect(err).NotTo(HaveOccurred())
if svc1IP == svc2IP {
@ -1513,6 +1544,58 @@ var _ = SIGDescribe("Services", func() {
By("switching to ClusterIP type to destroy loadbalancer")
jig.ChangeServiceType(svc.Namespace, svc.Name, v1.ServiceTypeClusterIP, createTimeout)
})
It("should have session affinity work for service with type clusterIP", func() {
svc := getServeHostnameService("service")
svc.Spec.Type = v1.ServiceTypeClusterIP
execAffinityTestForNonLBService(f, cs, svc, false)
})
It("should be able to switch session affinity for service with type clusterIP", func() {
svc := getServeHostnameService("service")
svc.Spec.Type = v1.ServiceTypeClusterIP
execAffinityTestForNonLBService(f, cs, svc, true)
})
It("should have session affinity work for NodePort service", func() {
svc := getServeHostnameService("service")
svc.Spec.Type = v1.ServiceTypeNodePort
execAffinityTestForNonLBService(f, cs, svc, false)
})
It("should be able to switch session affinity for NodePort service", func() {
svc := getServeHostnameService("service")
svc.Spec.Type = v1.ServiceTypeNodePort
execAffinityTestForNonLBService(f, cs, svc, true)
})
It("should have session affinity work for LoadBalancer service with ESIPP on [Slow]", func() {
svc := getServeHostnameService("service")
svc.Spec.Type = v1.ServiceTypeLoadBalancer
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
execAffinityTestForLBService(f, cs, svc, false)
})
It("should be able to switch session affinity for LoadBalancer service with ESIPP on [Slow]", func() {
svc := getServeHostnameService("service")
svc.Spec.Type = v1.ServiceTypeLoadBalancer
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
execAffinityTestForLBService(f, cs, svc, true)
})
It("should have session affinity work for LoadBalancer service with ESIPP off [Slow]", func() {
svc := getServeHostnameService("service")
svc.Spec.Type = v1.ServiceTypeLoadBalancer
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster
execAffinityTestForLBService(f, cs, svc, false)
})
It("should be able to switch session affinity for LoadBalancer service with ESIPP off [Slow]", func() {
svc := getServeHostnameService("service")
svc.Spec.Type = v1.ServiceTypeLoadBalancer
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeCluster
execAffinityTestForLBService(f, cs, svc, true)
})
})
// TODO: Get rid of [DisabledForLargeClusters] tag when issue #52495 is fixed.
@ -1867,3 +1950,92 @@ func execSourceipTest(f *framework.Framework, c clientset.Interface, ns, nodeNam
}
return execPod.Status.PodIP, outputs[1]
}
// execAffinityTestForNonLBService is a helper function that wrap the logic of
// affinity test for non-load-balancer services. Session afinity will be
// enabled when the service is created. If parameter isTransitionTest is true,
// session affinity will be switched off/on and test if the service converges
// to a stable affinity state.
func execAffinityTestForNonLBService(f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) {
ns := f.Namespace.Name
numPods, servicePort, serviceName := 3, defaultServeHostnameServicePort, svc.ObjectMeta.Name
By("creating service in namespace " + ns)
serviceType := svc.Spec.Type
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
_, _, err := framework.StartServeHostnameService(cs, f.InternalClientset, svc, ns, numPods)
Expect(err).NotTo(HaveOccurred())
defer func() {
framework.StopServeHostnameService(cs, f.InternalClientset, f.ScalesGetter, ns, serviceName)
}()
jig := framework.NewServiceTestJig(cs, serviceName)
svc, err = jig.Client.CoreV1().Services(ns).Get(serviceName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
var svcIp string
if serviceType == v1.ServiceTypeNodePort {
nodes := framework.GetReadySchedulableNodesOrDie(cs)
addrs := framework.CollectAddresses(nodes, v1.NodeInternalIP)
Expect(len(addrs)).To(BeNumerically(">", 0), "Failed to get Node internal IP")
svcIp = addrs[0]
servicePort = int(svc.Spec.Ports[0].NodePort)
} else {
svcIp = svc.Spec.ClusterIP
}
execPodName := framework.CreateExecPodOrFail(cs, ns, "execpod-affinity", nil)
defer func() {
framework.Logf("Cleaning up the exec pod")
err := cs.CoreV1().Pods(ns).Delete(execPodName, nil)
Expect(err).NotTo(HaveOccurred())
}()
execPod, err := cs.CoreV1().Pods(ns).Get(execPodName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
if !isTransitionTest {
Expect(framework.CheckAffinity(jig, execPod, svcIp, servicePort, true, false)).To(BeTrue())
}
if isTransitionTest {
svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
})
Expect(framework.CheckAffinity(jig, execPod, svcIp, servicePort, false, true)).To(BeTrue())
svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
})
Expect(framework.CheckAffinity(jig, execPod, svcIp, servicePort, true, true)).To(BeTrue())
}
}
// execAffinityTestForLBService is a helper function that wrap the logic of
// affinity test for load balancer services, similar to
// execAffinityTestForNonLBService.
func execAffinityTestForLBService(f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) {
numPods, ns, serviceName := 3, f.Namespace.Name, svc.ObjectMeta.Name
By("creating service in namespace " + ns)
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
_, _, err := framework.StartServeHostnameService(cs, f.InternalClientset, svc, ns, numPods)
Expect(err).NotTo(HaveOccurred())
jig := framework.NewServiceTestJig(cs, serviceName)
By("waiting for loadbalancer for service " + ns + "/" + serviceName)
svc = jig.WaitForLoadBalancerOrFail(ns, serviceName, framework.LoadBalancerCreateTimeoutDefault)
jig.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
defer func() {
framework.StopServeHostnameService(cs, f.InternalClientset, f.ScalesGetter, ns, serviceName)
}()
ingressIP := framework.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
port := int(svc.Spec.Ports[0].Port)
if !isTransitionTest {
Expect(framework.CheckAffinity(jig, nil, ingressIP, port, true, false)).To(BeTrue())
}
if isTransitionTest {
svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
})
Expect(framework.CheckAffinity(jig, nil, ingressIP, port, false, true)).To(BeTrue())
svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
})
Expect(framework.CheckAffinity(jig, nil, ingressIP, port, true, true)).To(BeTrue())
}
}