Merge pull request #59313 from MrHohn/e2e-ingress-scale-new

Automatic merge from submit-queue (batch tested with PRs 59424, 59672, 59313, 59661). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

[e2e gce-ingress] Scale test to measure ingress create/update latency

**What this PR does / why we need it**:
Adding a basic scale test. Test procedure:
- Create O(1) ingresses, measure creation latency for each ingress.
- Create and update one more ingress, do similar measurement on create & update latency.
- Repeat first two steps with O(10) ingresses.
- Repeat first two steps with O(100) ingresses.

Couple side notes:
- Each ingress reference a separate service.
- All services share the same set of backend pods.
- All ingress share one TLS secret.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #NONE 

**Special notes for your reviewer**:
/assign @rramkumar1 @nicksardo @bowei

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-02-09 14:46:34 -08:00 committed by GitHub
commit afa8c4fee5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 975 additions and 112 deletions

View File

@ -35,6 +35,8 @@ import (
"strings"
"time"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"k8s.io/api/core/v1"
@ -114,19 +116,29 @@ const (
nameLenLimit = 62
)
// IngressTestJig holds the relevant state and parameters of the ingress test.
type IngressTestJig struct {
Client clientset.Interface
RootCAs map[string][]byte
Address string
Ingress *extensions.Ingress
// class is the value of the annotation keyed under
// `kubernetes.io/ingress.class`. It's added to all ingresses created by
// this jig.
Class string
type TestLogger interface {
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
// The interval used to poll urls
PollInterval time.Duration
type GLogger struct{}
func (l *GLogger) Infof(format string, args ...interface{}) {
glog.Infof(format, args...)
}
func (l *GLogger) Errorf(format string, args ...interface{}) {
glog.Errorf(format, args...)
}
type E2ELogger struct{}
func (l *E2ELogger) Infof(format string, args ...interface{}) {
Logf(format, args...)
}
func (l *E2ELogger) Errorf(format string, args ...interface{}) {
Logf(format, args...)
}
// IngressConformanceTests contains a closure with an entry and exit log line.
@ -302,12 +314,11 @@ func BuildInsecureClient(timeout time.Duration) *http.Client {
return &http.Client{Timeout: timeout, Transport: utilnet.SetTransportDefaults(t)}
}
// createIngressTLSSecret creates a secret containing TLS certificates for the given Ingress.
// createTLSSecret creates a secret containing TLS certificates.
// If a secret with the same name already pathExists in the namespace of the
// Ingress, it's updated.
func createIngressTLSSecret(kubeClient clientset.Interface, ing *extensions.Ingress) (host string, rootCA, privKey []byte, err error) {
tls := ing.Spec.TLS[0]
host = strings.Join(tls.Hosts, ",")
func createTLSSecret(kubeClient clientset.Interface, namespace, secretName string, hosts ...string) (host string, rootCA, privKey []byte, err error) {
host = strings.Join(hosts, ",")
Logf("Generating RSA cert for host %v", host)
cert, key, err := GenerateRSACerts(host, true)
if err != nil {
@ -315,7 +326,7 @@ func createIngressTLSSecret(kubeClient clientset.Interface, ing *extensions.Ingr
}
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: tls.SecretName,
Name: secretName,
},
Data: map[string][]byte{
v1.TLSCertKey: cert,
@ -323,23 +334,39 @@ func createIngressTLSSecret(kubeClient clientset.Interface, ing *extensions.Ingr
},
}
var s *v1.Secret
if s, err = kubeClient.CoreV1().Secrets(ing.Namespace).Get(tls.SecretName, metav1.GetOptions{}); err == nil {
if s, err = kubeClient.CoreV1().Secrets(namespace).Get(secretName, metav1.GetOptions{}); err == nil {
// TODO: Retry the update. We don't really expect anything to conflict though.
Logf("Updating secret %v in ns %v with hosts %v for ingress %v", secret.Name, secret.Namespace, host, ing.Name)
Logf("Updating secret %v in ns %v with hosts %v", secret.Name, namespace, host)
s.Data = secret.Data
_, err = kubeClient.CoreV1().Secrets(ing.Namespace).Update(s)
_, err = kubeClient.CoreV1().Secrets(namespace).Update(s)
} else {
Logf("Creating secret %v in ns %v with hosts %v for ingress %v", secret.Name, secret.Namespace, host, ing.Name)
_, err = kubeClient.CoreV1().Secrets(ing.Namespace).Create(secret)
Logf("Creating secret %v in ns %v with hosts %v", secret.Name, namespace, host)
_, err = kubeClient.CoreV1().Secrets(namespace).Create(secret)
}
return host, cert, key, err
}
// CleanupGCEIngressController calls the GCEIngressController.Cleanup(false)
// GCEIngressController manages implementation details of Ingress on GCE/GKE.
type GCEIngressController struct {
Ns string
rcPath string
UID string
staticIPName string
rc *v1.ReplicationController
svc *v1.Service
Client clientset.Interface
Cloud CloudConfig
}
func (cont *GCEIngressController) CleanupGCEIngressController() error {
return cont.CleanupGCEIngressControllerWithTimeout(LoadBalancerCleanupTimeout)
}
// CleanupGCEIngressControllerWithTimeout calls the GCEIngressController.Cleanup(false)
// followed with deleting the static ip, and then a final GCEIngressController.Cleanup(true)
func CleanupGCEIngressController(gceController *GCEIngressController) {
pollErr := wait.Poll(5*time.Second, LoadBalancerCleanupTimeout, func() (bool, error) {
if err := gceController.Cleanup(false); err != nil {
func (cont *GCEIngressController) CleanupGCEIngressControllerWithTimeout(timeout time.Duration) error {
pollErr := wait.Poll(5*time.Second, timeout, func() (bool, error) {
if err := cont.Cleanup(false); err != nil {
Logf("Monitoring glbc's cleanup of gce resources:\n%v", err)
return false, nil
}
@ -349,7 +376,7 @@ func CleanupGCEIngressController(gceController *GCEIngressController) {
// Always try to cleanup even if pollErr == nil, because the cleanup
// routine also purges old leaked resources based on creation timestamp.
By("Performing final delete of any remaining resources")
if cleanupErr := gceController.Cleanup(true); cleanupErr != nil {
if cleanupErr := cont.Cleanup(true); cleanupErr != nil {
By(fmt.Sprintf("WARNING: possibly leaked resources: %v\n", cleanupErr))
} else {
By("No resources leaked.")
@ -360,7 +387,7 @@ func CleanupGCEIngressController(gceController *GCEIngressController) {
// to cleanup or it might interfere with the controller, causing it to
// throw out confusing events.
if ipErr := wait.Poll(5*time.Second, 1*time.Minute, func() (bool, error) {
if err := gceController.deleteStaticIPs(); err != nil {
if err := cont.deleteStaticIPs(); err != nil {
Logf("Failed to delete static-ip: %v\n", err)
return false, nil
}
@ -374,8 +401,21 @@ func CleanupGCEIngressController(gceController *GCEIngressController) {
// Logging that the GLBC failed to cleanup GCE resources on ingress deletion
// See kubernetes/ingress#431
if pollErr != nil {
Logf("error: L7 controller failed to delete all cloud resources on time. %v", pollErr)
return fmt.Errorf("error: L7 controller failed to delete all cloud resources on time. %v", pollErr)
}
return nil
}
func (cont *GCEIngressController) getL7AddonUID() (string, error) {
Logf("Retrieving UID from config map: %v/%v", metav1.NamespaceSystem, uidConfigMap)
cm, err := cont.Client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(uidConfigMap, metav1.GetOptions{})
if err != nil {
return "", err
}
if uid, ok := cm.Data[uidKey]; ok {
return uid, nil
}
return "", fmt.Errorf("Could not find cluster UID for L7 addon pod")
}
func (cont *GCEIngressController) ListGlobalForwardingRules() []*compute.ForwardingRule {
@ -905,9 +945,11 @@ func (cont *GCEIngressController) Cleanup(del bool) error {
}
// Init initializes the GCEIngressController with an UID
func (cont *GCEIngressController) Init() {
func (cont *GCEIngressController) Init() error {
uid, err := cont.getL7AddonUID()
Expect(err).NotTo(HaveOccurred())
if err != nil {
return err
}
cont.UID = uid
// There's a name limit imposed by GCE. The controller will truncate.
testName := fmt.Sprintf("k8s-fw-foo-app-X-%v--%v", cont.Ns, cont.UID)
@ -916,6 +958,7 @@ func (cont *GCEIngressController) Init() {
} else {
Logf("Detected cluster UID %v", cont.UID)
}
return nil
}
// CreateStaticIP allocates a random static ip with the given name. Returns a string
@ -1017,6 +1060,33 @@ func GcloudComputeResourceCreate(resource, name, project string, args ...string)
return err
}
// IngressTestJig holds the relevant state and parameters of the ingress test.
type IngressTestJig struct {
Client clientset.Interface
Logger TestLogger
RootCAs map[string][]byte
Address string
Ingress *extensions.Ingress
// class is the value of the annotation keyed under
// `kubernetes.io/ingress.class`. It's added to all ingresses created by
// this jig.
Class string
// The interval used to poll urls
PollInterval time.Duration
}
// NewIngressTestJig instantiates struct with client
func NewIngressTestJig(c clientset.Interface) *IngressTestJig {
return &IngressTestJig{
Client: c,
RootCAs: map[string][]byte{},
PollInterval: LoadBalancerPollInterval,
Logger: &E2ELogger{},
}
}
// CreateIngress creates the Ingress and associated service/rc.
// Required: ing.yaml, rc.yaml, svc.yaml must exist in manifestPath
// Optional: secret.yaml, ingAnnotations
@ -1028,10 +1098,10 @@ func (j *IngressTestJig) CreateIngress(manifestPath, ns string, ingAnnotations m
return filepath.Join(TestContext.RepoRoot, manifestPath, file)
}
Logf("creating replication controller")
j.Logger.Infof("creating replication controller")
RunKubectlOrDie("create", "-f", mkpath("rc.yaml"), fmt.Sprintf("--namespace=%v", ns))
Logf("creating service")
j.Logger.Infof("creating service")
RunKubectlOrDie("create", "-f", mkpath("svc.yaml"), fmt.Sprintf("--namespace=%v", ns))
if len(svcAnnotations) > 0 {
svcList, err := j.Client.CoreV1().Services(ns).List(metav1.ListOptions{})
@ -1044,10 +1114,10 @@ func (j *IngressTestJig) CreateIngress(manifestPath, ns string, ingAnnotations m
}
if exists, _ := utilfile.FileExists(mkpath("secret.yaml")); exists {
Logf("creating secret")
j.Logger.Infof("creating secret")
RunKubectlOrDie("create", "-f", mkpath("secret.yaml"), fmt.Sprintf("--namespace=%v", ns))
}
Logf("Parsing ingress from %v", filepath.Join(manifestPath, "ing.yaml"))
j.Logger.Infof("Parsing ingress from %v", filepath.Join(manifestPath, "ing.yaml"))
j.Ingress, err = manifest.IngressFromManifest(filepath.Join(manifestPath, "ing.yaml"))
ExpectNoError(err)
@ -1056,7 +1126,7 @@ func (j *IngressTestJig) CreateIngress(manifestPath, ns string, ingAnnotations m
for k, v := range ingAnnotations {
j.Ingress.Annotations[k] = v
}
Logf(fmt.Sprintf("creating" + j.Ingress.Name + " ingress"))
j.Logger.Infof(fmt.Sprintf("creating" + j.Ingress.Name + " ingress"))
j.Ingress, err = j.Client.ExtensionsV1beta1().Ingresses(ns).Create(j.Ingress)
ExpectNoError(err)
}
@ -1088,15 +1158,25 @@ func (j *IngressTestJig) AddHTTPS(secretName string, hosts ...string) {
j.Ingress.Spec.TLS = []extensions.IngressTLS{{Hosts: hosts, SecretName: secretName}}
// TODO: Just create the secret in GetRootCAs once we're watching secrets in
// the ingress controller.
_, cert, _, err := createIngressTLSSecret(j.Client, j.Ingress)
_, cert, _, err := createTLSSecret(j.Client, j.Ingress.Namespace, secretName, hosts...)
ExpectNoError(err)
Logf("Updating ingress %v to use secret %v for TLS termination", j.Ingress.Name, secretName)
j.Logger.Infof("Updating ingress %v to use secret %v for TLS termination", j.Ingress.Name, secretName)
j.Update(func(ing *extensions.Ingress) {
ing.Spec.TLS = []extensions.IngressTLS{{Hosts: hosts, SecretName: secretName}}
})
j.RootCAs[secretName] = cert
}
// PrepareTLSSecret creates a TLS secret and caches the cert.
func (j *IngressTestJig) PrepareTLSSecret(namespace, secretName string, hosts ...string) error {
_, cert, _, err := createTLSSecret(j.Client, namespace, secretName, hosts...)
if err != nil {
return err
}
j.RootCAs[secretName] = cert
return nil
}
// GetRootCA returns a rootCA from the ingress test jig.
func (j *IngressTestJig) GetRootCA(secretName string) (rootCA []byte) {
var ok bool
@ -1109,9 +1189,20 @@ func (j *IngressTestJig) GetRootCA(secretName string) (rootCA []byte) {
// TryDeleteIngress attempts to delete the ingress resource and logs errors if they occur.
func (j *IngressTestJig) TryDeleteIngress() {
err := j.Client.ExtensionsV1beta1().Ingresses(j.Ingress.Namespace).Delete(j.Ingress.Name, nil)
j.TryDeleteGivenIngress(j.Ingress)
}
func (j *IngressTestJig) TryDeleteGivenIngress(ing *extensions.Ingress) {
err := j.Client.ExtensionsV1beta1().Ingresses(ing.Namespace).Delete(ing.Name, nil)
if err != nil {
Logf("Error while deleting the ingress %v/%v: %v", j.Ingress.Namespace, j.Ingress.Name, err)
j.Logger.Infof("Error while deleting the ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}
}
func (j *IngressTestJig) TryDeleteGivenService(svc *v1.Service) {
err := j.Client.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil)
if err != nil {
j.Logger.Infof("Error while deleting the service %v/%v: %v", svc.Namespace, svc.Name, err)
}
}
@ -1134,12 +1225,12 @@ func getIngressAddress(client clientset.Interface, ns, name string) ([]string, e
}
// WaitForIngressAddress waits for the Ingress to acquire an address.
func WaitForIngressAddress(c clientset.Interface, ns, ingName string, timeout time.Duration) (string, error) {
func (j *IngressTestJig) WaitForIngressAddress(c clientset.Interface, ns, ingName string, timeout time.Duration) (string, error) {
var address string
err := wait.PollImmediate(10*time.Second, timeout, func() (bool, error) {
ipOrNameList, err := getIngressAddress(c, ns, ingName)
if err != nil || len(ipOrNameList) == 0 {
Logf("Waiting for Ingress %v to acquire IP, error %v", ingName, err)
j.Logger.Errorf("Waiting for Ingress %v to acquire IP, error %v", ingName, err)
if IsRetryableAPIError(err) {
return false, nil
}
@ -1151,52 +1242,62 @@ func WaitForIngressAddress(c clientset.Interface, ns, ingName string, timeout ti
return address, err
}
func (j *IngressTestJig) PollIngressWithCert(waitForNodePort bool, knownHosts []string, cert []byte) {
func (j *IngressTestJig) pollIngressWithCert(ing *extensions.Ingress, address string, knownHosts []string, cert []byte, waitForNodePort bool, timeout time.Duration) error {
// Check that all rules respond to a simple GET.
knownHostsSet := sets.NewString(knownHosts...)
for _, rules := range j.Ingress.Spec.Rules {
for _, rules := range ing.Spec.Rules {
timeoutClient := &http.Client{Timeout: IngressReqTimeout}
proto := "http"
if knownHostsSet.Has(rules.Host) {
var err error
// Create transport with cert to verify if the server uses the correct one.
timeoutClient.Transport, err = buildTransportWithCA(rules.Host, cert)
ExpectNoError(err)
if err != nil {
return err
}
proto = "https"
}
for _, p := range rules.IngressRuleValue.HTTP.Paths {
if waitForNodePort {
j.pollServiceNodePort(j.Ingress.Namespace, p.Backend.ServiceName, int(p.Backend.ServicePort.IntVal))
if err := j.pollServiceNodePort(ing.Namespace, p.Backend.ServiceName, int(p.Backend.ServicePort.IntVal)); err != nil {
return err
}
}
route := fmt.Sprintf("%v://%v%v", proto, address, p.Path)
j.Logger.Infof("Testing route %v host %v with simple GET", route, rules.Host)
if err := PollURL(route, rules.Host, timeout, j.PollInterval, timeoutClient, false); err != nil {
return err
}
route := fmt.Sprintf("%v://%v%v", proto, j.Address, p.Path)
Logf("Testing route %v host %v with simple GET", route, rules.Host)
ExpectNoError(PollURL(route, rules.Host, LoadBalancerPollTimeout, j.PollInterval, timeoutClient, false))
}
}
Logf("Finished polling on all rules on ingress %q", j.Ingress.Name)
j.Logger.Infof("Finished polling on all rules on ingress %q", ing.Name)
return nil
}
// WaitForIngress waits till the ingress acquires an IP, then waits for its
// hosts/urls to respond to a protocol check (either http or https). If
// waitForNodePort is true, the NodePort of the Service is verified before
// verifying the Ingress. NodePort is currently a requirement for cloudprovider
// Ingress.
func (j *IngressTestJig) WaitForIngress(waitForNodePort bool) {
j.WaitForGivenIngressWithTimeout(j.Ingress, waitForNodePort, LoadBalancerPollTimeout)
}
// WaitForGivenIngressWithTimeout waits till the ingress acquires an IP,
// then waits for its hosts/urls to respond to a protocol check (either
// http or https). If waitForNodePort is true, the NodePort of the Service
// is verified before verifying the Ingress. NodePort is currently a
// requirement for cloudprovider Ingress.
func (j *IngressTestJig) WaitForGivenIngressWithTimeout(ing *extensions.Ingress, waitForNodePort bool, timeout time.Duration) error {
// Wait for the loadbalancer IP.
address, err := WaitForIngressAddress(j.Client, j.Ingress.Namespace, j.Ingress.Name, LoadBalancerPollTimeout)
address, err := j.WaitForIngressAddress(j.Client, ing.Namespace, ing.Name, timeout)
if err != nil {
Failf("Ingress failed to acquire an IP address within %v", LoadBalancerPollTimeout)
return fmt.Errorf("Ingress failed to acquire an IP address within %v", timeout)
}
j.Address = address
Logf("Found address %v for ingress %v", j.Address, j.Ingress.Name)
j.Logger.Infof("Found address %v for ingress %v", address, ing.Name)
var knownHosts []string
var cert []byte
if len(j.Ingress.Spec.TLS) > 0 {
knownHosts = j.Ingress.Spec.TLS[0].Hosts
cert = j.GetRootCA(j.Ingress.Spec.TLS[0].SecretName)
if len(ing.Spec.TLS) > 0 {
knownHosts = ing.Spec.TLS[0].Hosts
cert = j.GetRootCA(ing.Spec.TLS[0].SecretName)
}
j.PollIngressWithCert(waitForNodePort, knownHosts, cert)
return j.pollIngressWithCert(ing, address, knownHosts, cert, waitForNodePort, timeout)
}
// WaitForIngress waits till the ingress acquires an IP, then waits for its
@ -1204,16 +1305,15 @@ func (j *IngressTestJig) WaitForIngress(waitForNodePort bool) {
// waitForNodePort is true, the NodePort of the Service is verified before
// verifying the Ingress. NodePort is currently a requirement for cloudprovider
// Ingress. Hostnames and certificate need to be explicitly passed in.
func (j *IngressTestJig) WaitForIngressWithCert(waitForNodePort bool, knownHosts []string, cert []byte) {
func (j *IngressTestJig) WaitForIngressWithCert(waitForNodePort bool, knownHosts []string, cert []byte) error {
// Wait for the loadbalancer IP.
address, err := WaitForIngressAddress(j.Client, j.Ingress.Namespace, j.Ingress.Name, LoadBalancerPollTimeout)
address, err := j.WaitForIngressAddress(j.Client, j.Ingress.Namespace, j.Ingress.Name, LoadBalancerPollTimeout)
if err != nil {
Failf("Ingress failed to acquire an IP address within %v", LoadBalancerPollTimeout)
return fmt.Errorf("Ingress failed to acquire an IP address within %v", LoadBalancerPollTimeout)
}
j.Address = address
Logf("Found address %v for ingress %v", j.Address, j.Ingress.Name)
j.Logger.Infof("Found address %v for ingress %v", address, j.Ingress.Name)
j.PollIngressWithCert(waitForNodePort, knownHosts, cert)
return j.pollIngressWithCert(j.Ingress, address, knownHosts, cert, waitForNodePort, LoadBalancerPollTimeout)
}
// VerifyURL polls for the given iterations, in intervals, and fails if the
@ -1225,17 +1325,19 @@ func (j *IngressTestJig) VerifyURL(route, host string, iterations int, interval
Logf(b)
return err
}
Logf("Verfied %v with host %v %d times, sleeping for %v", route, host, i, interval)
j.Logger.Infof("Verfied %v with host %v %d times, sleeping for %v", route, host, i, interval)
time.Sleep(interval)
}
return nil
}
func (j *IngressTestJig) pollServiceNodePort(ns, name string, port int) {
func (j *IngressTestJig) pollServiceNodePort(ns, name string, port int) error {
// TODO: Curl all nodes?
u, err := GetNodePortURL(j.Client, ns, name, port)
ExpectNoError(err)
ExpectNoError(PollURL(u, "", 30*time.Second, j.PollInterval, &http.Client{Timeout: IngressReqTimeout}, false))
if err != nil {
return err
}
return PollURL(u, "", 30*time.Second, j.PollInterval, &http.Client{Timeout: IngressReqTimeout}, false)
}
func (j *IngressTestJig) GetDefaultBackendNodePort() (int32, error) {
@ -1294,7 +1396,7 @@ func (j *IngressTestJig) ConstructFirewallForIngress(gceController *GCEIngressCo
// GetDistinctResponseFromIngress tries GET call to the ingress VIP and return all distinct responses.
func (j *IngressTestJig) GetDistinctResponseFromIngress() (sets.String, error) {
// Wait for the loadbalancer IP.
address, err := WaitForIngressAddress(j.Client, j.Ingress.Namespace, j.Ingress.Name, LoadBalancerPollTimeout)
address, err := j.WaitForIngressAddress(j.Client, j.Ingress.Namespace, j.Ingress.Name, LoadBalancerPollTimeout)
if err != nil {
Failf("Ingress failed to acquire an IP address within %v", LoadBalancerPollTimeout)
}
@ -1305,7 +1407,7 @@ func (j *IngressTestJig) GetDistinctResponseFromIngress() (sets.String, error) {
url := fmt.Sprintf("http://%v", address)
res, err := SimpleGET(timeoutClient, url, "")
if err != nil {
Logf("Failed to GET %q. Got responses: %q: %v", url, res, err)
j.Logger.Errorf("Failed to GET %q. Got responses: %q: %v", url, res, err)
return responses, err
}
responses.Insert(res)
@ -1313,35 +1415,6 @@ func (j *IngressTestJig) GetDistinctResponseFromIngress() (sets.String, error) {
return responses, nil
}
func (cont *GCEIngressController) getL7AddonUID() (string, error) {
Logf("Retrieving UID from config map: %v/%v", metav1.NamespaceSystem, uidConfigMap)
cm, err := cont.Client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(uidConfigMap, metav1.GetOptions{})
if err != nil {
return "", err
}
if uid, ok := cm.Data[uidKey]; ok {
return uid, nil
}
return "", fmt.Errorf("Could not find cluster UID for L7 addon pod")
}
// GCEIngressController manages implementation details of Ingress on GCE/GKE.
type GCEIngressController struct {
Ns string
rcPath string
UID string
staticIPName string
rc *v1.ReplicationController
svc *v1.Service
Client clientset.Interface
Cloud CloudConfig
}
// NewIngressTestJig instantiates struct with client
func NewIngressTestJig(c clientset.Interface) *IngressTestJig {
return &IngressTestJig{Client: c, RootCAs: map[string][]byte{}, PollInterval: LoadBalancerPollInterval}
}
// NginxIngressController manages implementation details of Ingress on Nginx.
type NginxIngressController struct {
Ns string

View File

@ -16,6 +16,7 @@ go_library(
"firewall.go",
"framework.go",
"ingress.go",
"ingress_scale.go",
"kube_proxy.go",
"network_policy.go",
"network_tiers.go",
@ -40,6 +41,7 @@ go_library(
"//pkg/master/ports:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/manifest:go_default_library",
"//test/e2e/network/scale:go_default_library",
"//test/images/net/nat:go_default_library",
"//test/utils:go_default_library",
"//test/utils/image:go_default_library",
@ -80,6 +82,9 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//test/e2e/network/scale:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -90,7 +90,8 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
Client: jig.Client,
Cloud: framework.TestContext.CloudConfig,
}
gceController.Init()
err := gceController.Init()
Expect(err).NotTo(HaveOccurred())
})
// Platform specific cleanup
@ -106,7 +107,7 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
jig.TryDeleteIngress()
By("Cleaning up cloud resources")
framework.CleanupGCEIngressController(gceController)
Expect(gceController.CleanupGCEIngressController()).NotTo(HaveOccurred())
})
It("should conform to Ingress spec", func() {
@ -356,7 +357,8 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
}, map[string]string{})
By("Test that ingress works with the pre-shared certificate")
jig.WaitForIngressWithCert(true, []string{testHostname}, cert)
err = jig.WaitForIngressWithCert(true, []string{testHostname}, cert)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Unexpected error while waiting for ingress: %v", err))
})
It("multicluster ingress should get instance group annotation", func() {
@ -398,7 +400,8 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
Client: jig.Client,
Cloud: framework.TestContext.CloudConfig,
}
gceController.Init()
err := gceController.Init()
Expect(err).NotTo(HaveOccurred())
})
// Platform specific cleanup
@ -414,7 +417,7 @@ var _ = SIGDescribe("Loadbalancing: L7", func() {
jig.TryDeleteIngress()
By("Cleaning up cloud resources")
framework.CleanupGCEIngressController(gceController)
Expect(gceController.CleanupGCEIngressController()).NotTo(HaveOccurred())
})
It("should conform to Ingress spec", func() {

View File

@ -0,0 +1,64 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/network/scale"
. "github.com/onsi/ginkgo"
)
var _ = SIGDescribe("Loadbalancing: L7 Scalability", func() {
defer GinkgoRecover()
var (
ns string
)
f := framework.NewDefaultFramework("ingress-scale")
BeforeEach(func() {
ns = f.Namespace.Name
})
Describe("GCE [Slow] [Serial] [Feature:IngressScale]", func() {
var (
scaleFramework *scale.IngressScaleFramework
)
BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke")
scaleFramework = scale.NewIngressScaleFramework(f.ClientSet, ns, framework.TestContext.CloudConfig)
if err := scaleFramework.PrepareScaleTest(); err != nil {
framework.Failf("Unexpected error while preraring ingress scale test: %v", err)
}
})
AfterEach(func() {
if errs := scaleFramework.CleanupScaleTest(); len(errs) != 0 {
framework.Failf("Unexpected error while cleaning up ingress scale test: %v", errs)
}
})
It("Creating and updating ingresses should happen promptly with small/medium/large amount of ingresses", func() {
if errs := scaleFramework.RunScaleTest(); len(errs) != 0 {
framework.Failf("Unexpected error while running ingress scale test: %v", errs)
}
})
})
})

View File

@ -0,0 +1,33 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["ingress.go"],
importpath = "k8s.io/kubernetes/test/e2e/network/scale",
visibility = ["//visibility:public"],
deps = [
"//test/e2e/framework:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//test/e2e/network/scale/localrun:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,461 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scale
import (
"fmt"
"io/ioutil"
"sync"
"time"
"k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
)
const (
numIngressesSmall = 5
numIngressesMedium = 20
numIngressesLarge = 45
scaleTestIngressNamePrefix = "ing-scale"
scaleTestBackendName = "echoheaders-scale"
scaleTestSecretName = "tls-secret-scale"
scaleTestHostname = "scale.ingress.com"
scaleTestNumBackends = 10
scaleTestPollInterval = 15 * time.Second
// We don't expect waitForIngress to take longer
// than waitForIngressMaxTimeout.
waitForIngressMaxTimeout = 80 * time.Minute
ingressesCleanupTimeout = 80 * time.Minute
)
var (
scaleTestLabels = map[string]string{
"app": scaleTestBackendName,
}
)
// IngressScaleFramework defines the framework for ingress scale testing.
type IngressScaleFramework struct {
Clientset clientset.Interface
Jig *framework.IngressTestJig
GCEController *framework.GCEIngressController
CloudConfig framework.CloudConfig
Logger framework.TestLogger
Namespace string
EnableTLS bool
NumIngressesTest []int
OutputFile string
ScaleTestDeploy *extensions.Deployment
ScaleTestSvcs []*v1.Service
ScaleTestIngs []*extensions.Ingress
// BatchCreateLatencies stores all ingress creation latencies, in different
// batches.
BatchCreateLatencies [][]time.Duration
// BatchDurations stores the total duration for each ingress batch creation.
BatchDurations []time.Duration
// StepCreateLatencies stores the single ingress creation latency, which happens
// after each ingress batch creation is complete.
StepCreateLatencies []time.Duration
// StepCreateLatencies stores the single ingress update latency, which happens
// after each ingress batch creation is complete.
StepUpdateLatencies []time.Duration
}
// NewIngressScaleFramework returns a new framework for ingress scale testing.
func NewIngressScaleFramework(cs clientset.Interface, ns string, cloudConfig framework.CloudConfig) *IngressScaleFramework {
return &IngressScaleFramework{
Namespace: ns,
Clientset: cs,
CloudConfig: cloudConfig,
Logger: &framework.E2ELogger{},
EnableTLS: true,
NumIngressesTest: []int{
numIngressesSmall,
numIngressesMedium,
numIngressesLarge,
},
}
}
// PrepareScaleTest prepares framework for ingress scale testing.
func (f *IngressScaleFramework) PrepareScaleTest() error {
f.Logger.Infof("Initializing ingress test suite and gce controller...")
f.Jig = framework.NewIngressTestJig(f.Clientset)
f.Jig.Logger = f.Logger
f.Jig.PollInterval = scaleTestPollInterval
f.GCEController = &framework.GCEIngressController{
Client: f.Clientset,
Cloud: f.CloudConfig,
}
if err := f.GCEController.Init(); err != nil {
return fmt.Errorf("Failed to initialize GCE controller: %v", err)
}
f.ScaleTestSvcs = []*v1.Service{}
f.ScaleTestIngs = []*extensions.Ingress{}
return nil
}
// CleanupScaleTest cleans up framework for ingress scale testing.
func (f *IngressScaleFramework) CleanupScaleTest() []error {
var errs []error
f.Logger.Infof("Cleaning up ingresses...")
for _, ing := range f.ScaleTestIngs {
if ing != nil {
if err := f.Clientset.ExtensionsV1beta1().Ingresses(ing.Namespace).Delete(ing.Name, nil); err != nil {
errs = append(errs, fmt.Errorf("Error while deleting ingress %s/%s: %v", ing.Namespace, ing.Name, err))
}
}
}
f.Logger.Infof("Cleaning up services...")
for _, svc := range f.ScaleTestSvcs {
if svc != nil {
if err := f.Clientset.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil); err != nil {
errs = append(errs, fmt.Errorf("Error while deleting service %s/%s: %v", svc.Namespace, svc.Name, err))
}
}
}
if f.ScaleTestDeploy != nil {
f.Logger.Infof("Cleaning up deployment %s...", f.ScaleTestDeploy.Name)
if err := f.Clientset.ExtensionsV1beta1().Deployments(f.ScaleTestDeploy.Namespace).Delete(f.ScaleTestDeploy.Name, nil); err != nil {
errs = append(errs, fmt.Errorf("Error while delting deployment %s/%s: %v", f.ScaleTestDeploy.Namespace, f.ScaleTestDeploy.Name, err))
}
}
f.Logger.Infof("Cleaning up cloud resources...")
if err := f.GCEController.CleanupGCEIngressControllerWithTimeout(ingressesCleanupTimeout); err != nil {
errs = append(errs, err)
}
return errs
}
// RunScaleTest runs ingress scale testing.
func (f *IngressScaleFramework) RunScaleTest() []error {
var errs []error
testDeploy := generateScaleTestBackendDeploymentSpec(scaleTestNumBackends)
f.Logger.Infof("Creating deployment %s...", testDeploy.Name)
testDeploy, err := f.Jig.Client.ExtensionsV1beta1().Deployments(f.Namespace).Create(testDeploy)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to create deployment %s: %v", testDeploy.Name, err))
return errs
}
f.ScaleTestDeploy = testDeploy
if f.EnableTLS {
f.Logger.Infof("Ensuring TLS secret %s...", scaleTestSecretName)
if err := f.Jig.PrepareTLSSecret(f.Namespace, scaleTestSecretName, scaleTestHostname); err != nil {
errs = append(errs, fmt.Errorf("Failed to prepare TLS secret %s: %v", scaleTestSecretName, err))
return errs
}
}
// currentNum keeps track of how many ingresses have been created.
currentNum := new(int)
prepareIngsFunc := func(goalNum int) {
var ingWg sync.WaitGroup
numToCreate := goalNum - *currentNum
ingWg.Add(numToCreate)
errQueue := make(chan error, numToCreate)
latencyQueue := make(chan time.Duration, numToCreate)
start := time.Now()
for ; *currentNum < goalNum; *currentNum++ {
suffix := fmt.Sprintf("%d", *currentNum)
go func() {
defer ingWg.Done()
start := time.Now()
svcCreated, ingCreated, err := f.createScaleTestServiceIngress(suffix, f.EnableTLS)
f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
if err != nil {
errQueue <- err
return
}
f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
if err := f.Jig.WaitForGivenIngressWithTimeout(ingCreated, false, waitForIngressMaxTimeout); err != nil {
errQueue <- err
return
}
elapsed := time.Since(start)
f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
latencyQueue <- elapsed
}()
}
// Wait until all ingress creations are complete.
f.Logger.Infof("Waiting for %d ingresses to come up...", numToCreate)
ingWg.Wait()
close(errQueue)
close(latencyQueue)
elapsed := time.Since(start)
var createLatencies []time.Duration
for latency := range latencyQueue {
createLatencies = append(createLatencies, latency)
}
f.BatchCreateLatencies = append(f.BatchCreateLatencies, createLatencies)
if len(errQueue) != 0 {
f.Logger.Errorf("Failed while creating services and ingresses, spent %v", elapsed)
for err := range errQueue {
errs = append(errs, err)
}
return
}
f.Logger.Infof("Spent %s for %d ingresses to come up", elapsed, numToCreate)
f.BatchDurations = append(f.BatchDurations, elapsed)
}
measureCreateUpdateFunc := func() {
f.Logger.Infof("Create one more ingress and wait for it to come up")
start := time.Now()
svcCreated, ingCreated, err := f.createScaleTestServiceIngress(fmt.Sprintf("%d", *currentNum), f.EnableTLS)
*currentNum = *currentNum + 1
f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
if err != nil {
errs = append(errs, err)
return
}
f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
if err := f.Jig.WaitForGivenIngressWithTimeout(ingCreated, false, waitForIngressMaxTimeout); err != nil {
errs = append(errs, err)
return
}
elapsed := time.Since(start)
f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
f.StepCreateLatencies = append(f.StepCreateLatencies, elapsed)
f.Logger.Infof("Updating ingress and wait for change to take effect")
ingToUpdate, err := f.Clientset.ExtensionsV1beta1().Ingresses(f.Namespace).Get(ingCreated.Name, metav1.GetOptions{})
if err != nil {
errs = append(errs, err)
return
}
addTestPathToIngress(ingToUpdate)
start = time.Now()
ingToUpdate, err = f.Clientset.ExtensionsV1beta1().Ingresses(f.Namespace).Update(ingToUpdate)
if err != nil {
errs = append(errs, err)
return
}
if err := f.Jig.WaitForGivenIngressWithTimeout(ingToUpdate, false, waitForIngressMaxTimeout); err != nil {
errs = append(errs, err)
return
}
elapsed = time.Since(start)
f.Logger.Infof("Spent %s for updating ingress %s", elapsed, ingToUpdate.Name)
f.StepUpdateLatencies = append(f.StepUpdateLatencies, elapsed)
}
defer f.dumpLatencies()
for _, num := range f.NumIngressesTest {
f.Logger.Infof("Create more ingresses until we reach %d ingresses", num)
prepareIngsFunc(num)
f.Logger.Infof("Measure create and update latency with %d ingresses", num)
measureCreateUpdateFunc()
if len(errs) != 0 {
return errs
}
}
return errs
}
func (f *IngressScaleFramework) dumpLatencies() {
f.Logger.Infof("Dumping scale test latencies...")
formattedData := f.GetFormattedLatencies()
if f.OutputFile != "" {
f.Logger.Infof("Dumping scale test latencies to file %s...", f.OutputFile)
ioutil.WriteFile(f.OutputFile, []byte(formattedData), 0644)
return
}
f.Logger.Infof("\n%v", formattedData)
}
// GetFormattedLatencies returns the formatted latencies output.
// TODO: Need a better way/format for data output.
func (f *IngressScaleFramework) GetFormattedLatencies() string {
if len(f.NumIngressesTest) == 0 ||
len(f.NumIngressesTest) != len(f.BatchCreateLatencies) ||
len(f.NumIngressesTest) != len(f.BatchDurations) ||
len(f.NumIngressesTest) != len(f.StepCreateLatencies) ||
len(f.NumIngressesTest) != len(f.StepUpdateLatencies) {
return "Failed to construct latencies output."
}
res := "--- Procedure logs ---\n"
for i, latencies := range f.BatchCreateLatencies {
res += fmt.Sprintf("Create %d ingresses parallelly, each of them takes below amount of time before starts serving traffic:\n", len(latencies))
for _, latency := range latencies {
res = res + fmt.Sprintf("- %v\n", latency)
}
res += fmt.Sprintf("Total duration for completing %d ingress creations: %v\n", len(latencies), f.BatchDurations[i])
res += fmt.Sprintf("Duration to create one more ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
res += fmt.Sprintf("Duration to update one ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
}
res = res + "--- Summary ---\n"
var batchTotalStr, batchAvgStr, singleCreateStr, singleUpdateStr string
for i, latencies := range f.BatchCreateLatencies {
batchTotalStr += fmt.Sprintf("Batch creation total latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), f.BatchDurations[i])
var avgLatency time.Duration
for _, latency := range latencies {
avgLatency = avgLatency + latency
}
avgLatency /= time.Duration(len(latencies))
batchAvgStr += fmt.Sprintf("Batch creation average latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), avgLatency)
singleCreateStr += fmt.Sprintf("Single ingress creation latency with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
singleUpdateStr += fmt.Sprintf("Single ingress update latency with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
}
res += batchTotalStr + batchAvgStr + singleCreateStr + singleUpdateStr
return res
}
func addTestPathToIngress(ing *extensions.Ingress) {
ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths = append(
ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths,
extensions.HTTPIngressPath{
Path: "/test",
Backend: ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths[0].Backend,
})
}
func (f *IngressScaleFramework) createScaleTestServiceIngress(suffix string, enableTLS bool) (*v1.Service, *extensions.Ingress, error) {
svcCreated, err := f.Clientset.CoreV1().Services(f.Namespace).Create(generateScaleTestServiceSpec(suffix))
if err != nil {
return nil, nil, err
}
ingCreated, err := f.Clientset.ExtensionsV1beta1().Ingresses(f.Namespace).Create(generateScaleTestIngressSpec(suffix, enableTLS))
if err != nil {
return nil, nil, err
}
return svcCreated, ingCreated, nil
}
func generateScaleTestIngressSpec(suffix string, enableTLS bool) *extensions.Ingress {
ing := &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", scaleTestIngressNamePrefix, suffix),
},
Spec: extensions.IngressSpec{
TLS: []extensions.IngressTLS{
{SecretName: scaleTestSecretName},
},
Rules: []extensions.IngressRule{
{
Host: scaleTestHostname,
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/scale",
Backend: extensions.IngressBackend{
ServiceName: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
ServicePort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
},
},
},
},
},
}
if enableTLS {
ing.Spec.TLS = []extensions.IngressTLS{
{SecretName: scaleTestSecretName},
}
}
return ing
}
func generateScaleTestServiceSpec(suffix string) *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
Labels: scaleTestLabels,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Protocol: v1.ProtocolTCP,
Port: 80,
TargetPort: intstr.FromInt(8080),
}},
Selector: scaleTestLabels,
Type: v1.ServiceTypeNodePort,
},
}
}
func generateScaleTestBackendDeploymentSpec(numReplicas int32) *extensions.Deployment {
return &extensions.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: scaleTestBackendName,
},
Spec: extensions.DeploymentSpec{
Replicas: &numReplicas,
Selector: &metav1.LabelSelector{MatchLabels: scaleTestLabels},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: scaleTestLabels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: scaleTestBackendName,
Image: "gcr.io/google_containers/echoserver:1.6",
Ports: []v1.ContainerPort{{ContainerPort: 8080}},
ReadinessProbe: &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromInt(8080),
Path: "/healthz",
},
},
FailureThreshold: 10,
PeriodSeconds: 1,
SuccessThreshold: 1,
TimeoutSeconds: 1,
},
},
},
},
},
},
}
}

View File

@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
go_library(
name = "go_default_library",
srcs = ["ingress_scale.go"],
importpath = "k8s.io/kubernetes/test/e2e/network/scale/localrun",
visibility = ["//visibility:private"],
deps = [
"//pkg/cloudprovider/providers/gce:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e/network/scale:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
],
)
go_binary(
name = "localrun",
embed = [":go_default_library"],
importpath = "k8s.io/kubernetes/test/e2e/network/scale/localrun",
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,185 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/network/scale"
)
var (
kubeconfig string
enableTLS bool
numIngressesTest numIngressesSlice
testNamespace string
cloudConfig framework.CloudConfig
outputFile string
cleanup bool
)
type numIngressesSlice []int
func (i *numIngressesSlice) String() string {
return fmt.Sprintf("%d", *i)
}
func (i *numIngressesSlice) Set(value string) error {
v, err := strconv.Atoi(value)
if err != nil {
return err
}
*i = append(*i, v)
sort.Ints(*i)
return nil
}
func registerFlags() {
if home := os.Getenv("HOME"); home != "" {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) Absolute path to the kubeconfig file")
} else {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file")
}
flag.StringVar(&cloudConfig.ProjectID, "project", "", "GCE project being used")
flag.StringVar(&cloudConfig.Zone, "zone", "", "GCE zone being used")
flag.StringVar(&cloudConfig.Region, "region", "", "GCE region being used")
flag.Var(&numIngressesTest, "num-ingresses", "The number of ingresses to test, specify multiple times for step testing (e.g. 5 ingresses -> 20 ingresses -> 100 ingresses)")
flag.BoolVar(&enableTLS, "enable-tls", true, "Whether to enable TLS on ingress")
flag.StringVar(&testNamespace, "namespace", "ingress-test-scale", "Namespace for testing")
flag.StringVar(&outputFile, "output", "", "If specify, dump latencies to the specified file")
flag.BoolVar(&cleanup, "cleanup", true, "Whether to cleanup resources after test")
}
func verifyFlags() error {
if cloudConfig.ProjectID == "" || cloudConfig.Zone == "" || cloudConfig.Region == "" {
return fmt.Errorf("must set all of --project, --zone and --region")
}
return nil
}
func main() {
registerFlags()
flag.Parse()
if err := verifyFlags(); err != nil {
glog.Errorf("Failed to verify flags: %v", err)
os.Exit(1)
}
// Initializing a k8s client.
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
glog.Errorf("Failed to build kubeconfig: %v", err)
os.Exit(1)
}
cs, err := clientset.NewForConfig(config)
if err != nil {
glog.Errorf("Failed to create kubeclient: %v", err)
os.Exit(1)
}
// Initializing a GCE client.
gceAlphaFeatureGate, err := gcecloud.NewAlphaFeatureGate([]string{})
if err != nil {
glog.Errorf("Encountered error for creating alpha feature gate: %v", err)
os.Exit(1)
}
gceCloud, err := gcecloud.CreateGCECloud(&gcecloud.CloudConfig{
ProjectID: cloudConfig.ProjectID,
Region: cloudConfig.Region,
Zone: cloudConfig.Zone,
AlphaFeatureGate: gceAlphaFeatureGate,
})
if err != nil {
glog.Errorf("Error building GCE provider: %v", err)
os.Exit(1)
}
cloudConfig.Provider = gceCloud
testSuccessFlag := true
defer func() {
if !testSuccessFlag {
glog.Errorf("Ingress scale test failed.")
os.Exit(1)
}
}()
ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: testNamespace,
},
}
glog.Infof("Creating namespace %s...", ns.Name)
if _, err := cs.CoreV1().Namespaces().Create(ns); err != nil {
glog.Errorf("Failed to create namespace %s: %v", ns.Name, err)
testSuccessFlag = false
return
}
if cleanup {
defer func() {
glog.Infof("Deleting namespace %s...", ns.Name)
if err := cs.CoreV1().Namespaces().Delete(ns.Name, nil); err != nil {
glog.Errorf("Failed to delete namespace %s: %v", ns.Name, err)
testSuccessFlag = false
}
}()
}
// Setting up a localized scale test framework.
f := scale.NewIngressScaleFramework(cs, ns.Name, cloudConfig)
f.Logger = &framework.GLogger{}
// Customizing scale test.
f.EnableTLS = enableTLS
f.OutputFile = outputFile
if len(numIngressesTest) != 0 {
f.NumIngressesTest = numIngressesTest
}
// Real test begins.
if cleanup {
defer func() {
if errs := f.CleanupScaleTest(); len(errs) != 0 {
glog.Errorf("Failed to cleanup scale test: %v", errs)
testSuccessFlag = false
}
}()
}
err = f.PrepareScaleTest()
if err != nil {
glog.Errorf("Failed to prepare scale test: %v", err)
testSuccessFlag = false
return
}
if errs := f.RunScaleTest(); len(errs) != 0 {
glog.Errorf("Failed while running scale test: %v", errs)
testSuccessFlag = false
}
}

View File

@ -81,7 +81,7 @@ func (t *IngressUpgradeTest) Setup(f *framework.Framework) {
Client: jig.Client,
Cloud: framework.TestContext.CloudConfig,
}
gceController.Init()
framework.ExpectNoError(gceController.Init())
t.gceController = gceController
t.jig = jig
@ -142,7 +142,7 @@ func (t *IngressUpgradeTest) Teardown(f *framework.Framework) {
}
By("Cleaning up cloud resources")
framework.CleanupGCEIngressController(t.gceController)
framework.ExpectNoError(t.gceController.CleanupGCEIngressController())
}
func (t *IngressUpgradeTest) verify(f *framework.Framework, done <-chan struct{}, testDuringDisruption bool) {