Merge pull request #54111 from freehan/neg-test

Automatic merge from submit-queue (batch tested with PRs 54107, 54184, 54377, 54094, 54111). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

add e2e tests for NEG

This PR includes tests:
- ingress conformance test
- scaling up and down backends
- switching backend between IG and NEG
- rolling update backend should not cause service disruption

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-10-24 15:59:17 -07:00 committed by GitHub
commit 8d3a19229f
7 changed files with 368 additions and 16 deletions

View File

@ -74,7 +74,7 @@ func setupProviderConfig() error {
managedZones = []string{zone}
}
gceAlphaFeatureGate, err := gcecloud.NewAlphaFeatureGate([]string{})
gceAlphaFeatureGate, err := gcecloud.NewAlphaFeatureGate([]string{gcecloud.AlphaFeatureNetworkEndpointGroup})
if err != nil {
glog.Errorf("Encountered error for creating alpha feature gate: %v", err)
}

View File

@ -126,7 +126,7 @@ type IngressConformanceTests struct {
// CreateIngressComformanceTests generates an slice of sequential test cases:
// a simple http ingress, ingress with HTTPS, ingress HTTPS with a modified hostname,
// ingress https with a modified URLMap
func CreateIngressComformanceTests(jig *IngressTestJig, ns string) []IngressConformanceTests {
func CreateIngressComformanceTests(jig *IngressTestJig, ns string, annotations map[string]string) []IngressConformanceTests {
manifestPath := filepath.Join(IngressManifestPath, "http")
// These constants match the manifests used in IngressManifestPath
tlsHost := "foo.bar.com"
@ -138,7 +138,7 @@ func CreateIngressComformanceTests(jig *IngressTestJig, ns string) []IngressConf
return []IngressConformanceTests{
{
fmt.Sprintf("should create a basic HTTP ingress"),
func() { jig.CreateIngress(manifestPath, ns, map[string]string{}) },
func() { jig.CreateIngress(manifestPath, ns, annotations, annotations) },
fmt.Sprintf("waiting for urls on basic HTTP ingress"),
},
{
@ -591,6 +591,39 @@ func (cont *GCEIngressController) deleteInstanceGroup(del bool) (msg string) {
return msg
}
func (cont *GCEIngressController) deleteNetworkEndpointGroup(del bool) (msg string) {
gceCloud := cont.Cloud.Provider.(*gcecloud.GCECloud)
// TODO: E2E cloudprovider has only 1 zone, but the cluster can have many.
// We need to poll on all NEGs across all zones.
negList, err := gceCloud.ListNetworkEndpointGroup(cont.Cloud.Zone)
if err != nil {
if cont.isHTTPErrorCode(err, http.StatusNotFound) {
return msg
}
// Do not return error as NEG is still alpha.
Logf("Failed to list network endpoint group: %v", err)
return msg
}
if len(negList) == 0 {
return msg
}
for _, neg := range negList {
if !cont.canDeleteNEG(neg.Name, neg.CreationTimestamp, del) {
continue
}
if del {
Logf("Deleting network-endpoint-group: %s", neg.Name)
if err := gceCloud.DeleteNetworkEndpointGroup(neg.Name, cont.Cloud.Zone); err != nil &&
!cont.isHTTPErrorCode(err, http.StatusNotFound) {
msg += fmt.Sprintf("Failed to delete network endpoint group %v\n", neg.Name)
}
} else {
msg += fmt.Sprintf("%v (network-endpoint-group)\n", neg.Name)
}
}
return msg
}
// canDelete returns true if either the name ends in a suffix matching this
// controller's UID, or the creationTimestamp exceeds the maxAge and del is set
// to true. Always returns false if the name doesn't match that we expect for
@ -617,6 +650,28 @@ func (cont *GCEIngressController) canDelete(resourceName, creationTimestamp stri
if !delOldResources {
return false
}
return canDeleteWithTimestamp(resourceName, creationTimestamp)
}
// canDeleteNEG returns true if either the name contains this controller's UID,
// or the creationTimestamp exceeds the maxAge and del is set to true.
func (cont *GCEIngressController) canDeleteNEG(resourceName, creationTimestamp string, delOldResources bool) bool {
if !strings.HasPrefix(resourceName, "k8s") {
return false
}
if strings.Contains(resourceName, cont.UID) {
return true
}
if !delOldResources {
return false
}
return canDeleteWithTimestamp(resourceName, creationTimestamp)
}
func canDeleteWithTimestamp(resourceName, creationTimestamp string) bool {
createdTime, err := time.Parse(time.RFC3339, creationTimestamp)
if err != nil {
Logf("WARNING: Failed to parse creation timestamp %v for %v: %v", creationTimestamp, resourceName, err)
@ -667,6 +722,44 @@ func (cont *GCEIngressController) isHTTPErrorCode(err error, code int) bool {
return ok && apiErr.Code == code
}
// BackendServiceUsingNEG returns true only if all global backend service with matching nodeports pointing to NEG as backend
func (cont *GCEIngressController) BackendServiceUsingNEG(nodeports []string) (bool, error) {
return cont.backendMode(nodeports, "networkEndpointGroups")
}
// BackendServiceUsingIG returns true only if all global backend service with matching nodeports pointing to IG as backend
func (cont *GCEIngressController) BackendServiceUsingIG(nodeports []string) (bool, error) {
return cont.backendMode(nodeports, "instanceGroups")
}
func (cont *GCEIngressController) backendMode(nodeports []string, keyword string) (bool, error) {
gceCloud := cont.Cloud.Provider.(*gcecloud.GCECloud)
beList, err := gceCloud.ListGlobalBackendServices()
if err != nil {
return false, fmt.Errorf("failed to list backend services: %v", err)
}
matchingBackendService := 0
for _, bs := range beList.Items {
match := false
for _, np := range nodeports {
// Warning: This assumes backend service naming convention includes nodeport in the name
if strings.Contains(bs.Name, np) {
match = true
matchingBackendService += 1
}
}
if match {
for _, be := range bs.Backends {
if !strings.Contains(be.Group, keyword) {
return false, nil
}
}
}
}
return matchingBackendService == len(nodeports), nil
}
// Cleanup cleans up cloud resources.
// If del is false, it simply reports existing resources without deleting them.
// If dle is true, it deletes resources it finds acceptable (see canDelete func).
@ -683,6 +776,7 @@ func (cont *GCEIngressController) Cleanup(del bool) error {
errMsg += cont.deleteHTTPHealthCheck(del)
errMsg += cont.deleteInstanceGroup(del)
errMsg += cont.deleteNetworkEndpointGroup(del)
errMsg += cont.deleteFirewallRule(del)
errMsg += cont.deleteSSLCertificate(del)
@ -812,7 +906,9 @@ func GcloudComputeResourceCreate(resource, name, project string, args ...string)
// Required: ing.yaml, rc.yaml, svc.yaml must exist in manifestPath
// Optional: secret.yaml, ingAnnotations
// If ingAnnotations is specified it will overwrite any annotations in ing.yaml
func (j *IngressTestJig) CreateIngress(manifestPath, ns string, ingAnnotations map[string]string) {
// If svcAnnotations is specified it will overwrite any annotations in svc.yaml
func (j *IngressTestJig) CreateIngress(manifestPath, ns string, ingAnnotations map[string]string, svcAnnotations map[string]string) {
var err error
mkpath := func(file string) string {
return filepath.Join(TestContext.RepoRoot, manifestPath, file)
}
@ -822,13 +918,22 @@ func (j *IngressTestJig) CreateIngress(manifestPath, ns string, ingAnnotations m
Logf("creating service")
RunKubectlOrDie("create", "-f", mkpath("svc.yaml"), fmt.Sprintf("--namespace=%v", ns))
if len(svcAnnotations) > 0 {
svcList, err := j.Client.CoreV1().Services(ns).List(metav1.ListOptions{})
ExpectNoError(err)
for _, svc := range svcList.Items {
svc.Annotations = svcAnnotations
_, err = j.Client.CoreV1().Services(ns).Update(&svc)
ExpectNoError(err)
}
}
if exists, _ := utilfile.FileExists(mkpath("secret.yaml")); exists {
Logf("creating secret")
RunKubectlOrDie("create", "-f", mkpath("secret.yaml"), fmt.Sprintf("--namespace=%v", ns))
}
Logf("Parsing ingress from %v", filepath.Join(manifestPath, "ing.yaml"))
var err error
j.Ingress, err = manifest.IngressFromManifest(filepath.Join(manifestPath, "ing.yaml"))
ExpectNoError(err)
j.Ingress.Namespace = ns
@ -954,14 +1059,16 @@ func (j *IngressTestJig) pollServiceNodePort(ns, name string, port int) {
ExpectNoError(PollURL(u, "", 30*time.Second, j.PollInterval, &http.Client{Timeout: IngressReqTimeout}, false))
}
// GetIngressNodePorts returns all related backend services' nodePorts.
// GetIngressNodePorts returns related backend services' nodePorts.
// Current GCE ingress controller allows traffic to the default HTTP backend
// by default, so retrieve its nodePort as well.
func (j *IngressTestJig) GetIngressNodePorts() []string {
// by default, so retrieve its nodePort if includeDefaultBackend is true.
func (j *IngressTestJig) GetIngressNodePorts(includeDefaultBackend bool) []string {
nodePorts := []string{}
defaultSvc, err := j.Client.Core().Services(metav1.NamespaceSystem).Get(defaultBackendName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
nodePorts = append(nodePorts, strconv.Itoa(int(defaultSvc.Spec.Ports[0].NodePort)))
if includeDefaultBackend {
defaultSvc, err := j.Client.Core().Services(metav1.NamespaceSystem).Get(defaultBackendName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
nodePorts = append(nodePorts, strconv.Itoa(int(defaultSvc.Spec.Ports[0].NodePort)))
}
backendSvcs := []string{}
if j.Ingress.Spec.Backend != nil {
@ -982,7 +1089,7 @@ func (j *IngressTestJig) GetIngressNodePorts() []string {
// ConstructFirewallForIngress returns the expected GCE firewall rule for the ingress resource
func (j *IngressTestJig) ConstructFirewallForIngress(gceController *GCEIngressController, nodeTags []string) *compute.Firewall {
nodePorts := j.GetIngressNodePorts()
nodePorts := j.GetIngressNodePorts(true)
fw := compute.Firewall{}
fw.Name = gceController.GetFirewallRuleName()
@ -997,6 +1104,28 @@ func (j *IngressTestJig) ConstructFirewallForIngress(gceController *GCEIngressCo
return &fw
}
// GetDistinctResponseFromIngress tries GET call to the ingress VIP and return all distinct responses.
func (j *IngressTestJig) GetDistinctResponseFromIngress() (sets.String, error) {
// Wait for the loadbalancer IP.
address, err := WaitForIngressAddress(j.Client, j.Ingress.Namespace, j.Ingress.Name, LoadBalancerPollTimeout)
if err != nil {
Failf("Ingress failed to acquire an IP address within %v", LoadBalancerPollTimeout)
}
responses := sets.NewString()
timeoutClient := &http.Client{Timeout: IngressReqTimeout}
for i := 0; i < 100; i++ {
url := fmt.Sprintf("http://%v", address)
res, err := SimpleGET(timeoutClient, url, "")
if err != nil {
Logf("Failed to GET %q. Got responses: %q: %v", url, res, err)
return responses, err
}
responses.Insert(res)
}
return responses, nil
}
func (cont *GCEIngressController) getL7AddonUID() (string, error) {
Logf("Retrieving UID from config map: %v/%v", metav1.NamespaceSystem, uidConfigMap)
cm, err := cont.Client.Core().ConfigMaps(metav1.NamespaceSystem).Get(uidConfigMap, metav1.GetOptions{})

View File

@ -22,7 +22,9 @@ import (
"time"
rbacv1beta1 "k8s.io/api/rbac/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
"k8s.io/kubernetes/test/e2e/framework"
@ -30,6 +32,11 @@ import (
. "github.com/onsi/gomega"
)
const (
NEGAnnotation = "alpha.cloud.google.com/load-balancer-neg"
NEGUpdateTimeout = 2 * time.Minute
)
var _ = SIGDescribe("Loadbalancing: L7", func() {
defer GinkgoRecover()
var (
@ -96,7 +103,7 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
})
It("should conform to Ingress spec", func() {
conformanceTests = framework.CreateIngressComformanceTests(jig, ns)
conformanceTests = framework.CreateIngressComformanceTests(jig, ns, map[string]string{})
for _, t := range conformanceTests {
By(t.EntryLog)
t.Execute()
@ -113,7 +120,7 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
jig.CreateIngress(filepath.Join(framework.IngressManifestPath, "static-ip"), ns, map[string]string{
"kubernetes.io/ingress.global-static-ip-name": ns,
"kubernetes.io/ingress.allow-http": "false",
})
}, map[string]string{})
By("waiting for Ingress to come up with ip: " + ip)
httpClient := framework.BuildInsecureClient(framework.IngressReqTimeout)
@ -149,6 +156,182 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
// TODO: Implement a multizone e2e that verifies traffic reaches each
// zone based on pod labels.
})
Describe("GCE [Slow] [Feature:NEG]", func() {
var gceController *framework.GCEIngressController
// Platform specific setup
BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke")
By("Initializing gce controller")
gceController = &framework.GCEIngressController{
Ns: ns,
Client: jig.Client,
Cloud: framework.TestContext.CloudConfig,
}
gceController.Init()
})
// Platform specific cleanup
AfterEach(func() {
if CurrentGinkgoTestDescription().Failed {
framework.DescribeIng(ns)
}
if jig.Ingress == nil {
By("No ingress created, no cleanup necessary")
return
}
By("Deleting ingress")
jig.TryDeleteIngress()
By("Cleaning up cloud resources")
framework.CleanupGCEIngressController(gceController)
})
It("should conform to Ingress spec", func() {
jig.PollInterval = 5 * time.Second
conformanceTests = framework.CreateIngressComformanceTests(jig, ns, map[string]string{
NEGAnnotation: "true",
})
for _, t := range conformanceTests {
By(t.EntryLog)
t.Execute()
By(t.ExitLog)
jig.WaitForIngress(true)
usingNeg, err := gceController.BackendServiceUsingNEG(jig.GetIngressNodePorts(false))
Expect(err).NotTo(HaveOccurred())
Expect(usingNeg).To(BeTrue())
}
})
It("should be able to switch between IG and NEG modes", func() {
var err error
By("Create a basic HTTP ingress using NEG")
jig.CreateIngress(filepath.Join(framework.IngressManifestPath, "neg"), ns, map[string]string{}, map[string]string{})
jig.WaitForIngress(true)
usingNEG, err := gceController.BackendServiceUsingNEG(jig.GetIngressNodePorts(false))
Expect(err).NotTo(HaveOccurred())
Expect(usingNEG).To(BeTrue())
By("Switch backend service to use IG")
svcList, err := f.ClientSet.CoreV1().Services(ns).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
for _, svc := range svcList.Items {
svc.Annotations[NEGAnnotation] = "false"
_, err = f.ClientSet.CoreV1().Services(ns).Update(&svc)
Expect(err).NotTo(HaveOccurred())
}
wait.Poll(5*time.Second, framework.LoadBalancerPollTimeout, func() (bool, error) {
return gceController.BackendServiceUsingIG(jig.GetIngressNodePorts(true))
})
jig.WaitForIngress(true)
By("Switch backend service to use NEG")
svcList, err = f.ClientSet.CoreV1().Services(ns).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
for _, svc := range svcList.Items {
svc.Annotations[NEGAnnotation] = "true"
_, err = f.ClientSet.CoreV1().Services(ns).Update(&svc)
Expect(err).NotTo(HaveOccurred())
}
wait.Poll(5*time.Second, framework.LoadBalancerPollTimeout, func() (bool, error) {
return gceController.BackendServiceUsingNEG(jig.GetIngressNodePorts(false))
})
jig.WaitForIngress(true)
})
It("should sync endpoints to NEG", func() {
name := "hostname"
scaleAndValidateNEG := func(num int) {
scale, err := f.ClientSet.ExtensionsV1beta1().Deployments(ns).GetScale(name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
if scale.Spec.Replicas != int32(num) {
scale.Spec.Replicas = int32(num)
_, err = f.ClientSet.ExtensionsV1beta1().Deployments(ns).UpdateScale(name, scale)
Expect(err).NotTo(HaveOccurred())
}
wait.Poll(5*time.Second, NEGUpdateTimeout, func() (bool, error) {
res, err := jig.GetDistinctResponseFromIngress()
if err != nil {
return false, err
}
return res.Len() == num, err
})
}
By("Create a basic HTTP ingress using NEG")
jig.CreateIngress(filepath.Join(framework.IngressManifestPath, "neg"), ns, map[string]string{}, map[string]string{})
jig.WaitForIngress(true)
usingNEG, err := gceController.BackendServiceUsingNEG(jig.GetIngressNodePorts(false))
Expect(err).NotTo(HaveOccurred())
Expect(usingNEG).To(BeTrue())
// initial replicas number is 1
scaleAndValidateNEG(1)
By("Scale up number of backends to 5")
scaleAndValidateNEG(5)
By("Scale down number of backends to 3")
scaleAndValidateNEG(3)
By("Scale up number of backends to 6")
scaleAndValidateNEG(6)
By("Scale down number of backends to 2")
scaleAndValidateNEG(3)
})
It("rolling update backend pods should not cause service disruption", func() {
name := "hostname"
replicas := 8
By("Create a basic HTTP ingress using NEG")
jig.CreateIngress(filepath.Join(framework.IngressManifestPath, "neg"), ns, map[string]string{}, map[string]string{})
jig.WaitForIngress(true)
usingNEG, err := gceController.BackendServiceUsingNEG(jig.GetIngressNodePorts(false))
Expect(err).NotTo(HaveOccurred())
Expect(usingNEG).To(BeTrue())
By(fmt.Sprintf("Scale backend replicas to %d", replicas))
scale, err := f.ClientSet.ExtensionsV1beta1().Deployments(ns).GetScale(name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
scale.Spec.Replicas = int32(replicas)
_, err = f.ClientSet.ExtensionsV1beta1().Deployments(ns).UpdateScale(name, scale)
Expect(err).NotTo(HaveOccurred())
wait.Poll(5*time.Second, framework.LoadBalancerPollTimeout, func() (bool, error) {
res, err := jig.GetDistinctResponseFromIngress()
if err != nil {
return false, err
}
return res.Len() == replicas, err
})
By("Trigger rolling update and observe service disruption")
deploy, err := f.ClientSet.ExtensionsV1beta1().Deployments(ns).Get(name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
// trigger by changing graceful termination period to 60 seconds
gracePeriod := int64(60)
deploy.Spec.Template.Spec.TerminationGracePeriodSeconds = &gracePeriod
_, err = f.ClientSet.ExtensionsV1beta1().Deployments(ns).Update(deploy)
Expect(err).NotTo(HaveOccurred())
wait.Poll(30*time.Second, framework.LoadBalancerPollTimeout, func() (bool, error) {
res, err := jig.GetDistinctResponseFromIngress()
Expect(err).NotTo(HaveOccurred())
deploy, err := f.ClientSet.ExtensionsV1beta1().Deployments(ns).Get(name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
if int(deploy.Status.UpdatedReplicas) == replicas {
if res.Len() == replicas {
return true, nil
} else {
framework.Logf("Expecting %d different responses, but got %d.", replicas, res.Len())
return false, nil
}
} else {
framework.Logf("Waiting for rolling update to finished. Keep sending traffic.")
return false, nil
}
})
})
})
// Time: borderline 5m, slow by design
Describe("[Slow] Nginx", func() {
@ -191,7 +374,7 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
// Poll more frequently to reduce e2e completion time.
// This test runs in presubmit.
jig.PollInterval = 5 * time.Second
conformanceTests = framework.CreateIngressComformanceTests(jig, ns)
conformanceTests = framework.CreateIngressComformanceTests(jig, ns, map[string]string{})
for _, t := range conformanceTests {
By(t.EntryLog)
t.Execute()

View File

@ -0,0 +1,8 @@
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: hostname
spec:
backend:
serviceName: hostname
servicePort: 80

View File

@ -0,0 +1,17 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
run: hostname
name: hostname
spec:
template:
metadata:
labels:
run: hostname
spec:
containers:
- image: gcr.io/kubernetes-e2e-test-images/serve-hostname-amd64:1.1
imagePullPolicy: IfNotPresent
name: hostname
terminationGracePeriodSeconds: 120

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: hostname
annotations:
alpha.cloud.google.com/load-balancer-neg: "true"
spec:
ports:
- port: 80
protocol: TCP
targetPort: 9376
selector:
run: hostname
sessionAffinity: None
type: NodePort

View File

@ -69,7 +69,7 @@ func (t *IngressUpgradeTest) Setup(f *framework.Framework) {
jig.CreateIngress(filepath.Join(framework.IngressManifestPath, "static-ip"), ns.Name, map[string]string{
"kubernetes.io/ingress.global-static-ip-name": t.ipName,
"kubernetes.io/ingress.allow-http": "false",
})
}, map[string]string{})
By("waiting for Ingress to come up with ip: " + t.ip)
framework.ExpectNoError(framework.PollURL(fmt.Sprintf("https://%v/", t.ip), "", framework.LoadBalancerPollTimeout, jig.PollInterval, t.httpClient, false))