From c025d771f5f0de02fb0f465d7f760c3e8153529c Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Fri, 17 Feb 2017 12:02:55 +0100 Subject: [PATCH] Refactor cluster logging tests and add load tests --- test/e2e/BUILD | 6 +- test/e2e/cluster_logging_es.go | 257 ++------------------------ test/e2e/cluster_logging_es_utils.go | 240 ++++++++++++++++++++++++ test/e2e/cluster_logging_gcl.go | 110 ++--------- test/e2e/cluster_logging_gcl_load.go | 105 +++++++++++ test/e2e/cluster_logging_gcl_utils.go | 127 +++++++++++++ test/e2e/cluster_logging_utils.go | 212 +++++++++++++++++---- test/test_owners.csv | 2 + 8 files changed, 692 insertions(+), 367 deletions(-) create mode 100644 test/e2e/cluster_logging_es_utils.go create mode 100644 test/e2e/cluster_logging_gcl_load.go create mode 100644 test/e2e/cluster_logging_gcl_utils.go diff --git a/test/e2e/BUILD b/test/e2e/BUILD index 7d6d5b35808..cd65a5853fc 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -16,7 +16,10 @@ go_library( "autoscaling_utils.go", "cadvisor.go", "cluster_logging_es.go", + "cluster_logging_es_utils.go", "cluster_logging_gcl.go", + "cluster_logging_gcl_load.go", + "cluster_logging_gcl_utils.go", "cluster_logging_utils.go", "cluster_size_autoscaling.go", "cluster_upgrade.go", @@ -178,8 +181,10 @@ go_library( "//vendor:golang.org/x/crypto/ssh", "//vendor:golang.org/x/net/context", "//vendor:golang.org/x/net/websocket", + "//vendor:golang.org/x/oauth2/google", "//vendor:google.golang.org/api/compute/v1", "//vendor:google.golang.org/api/googleapi", + "//vendor:google.golang.org/api/logging/v2beta1", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", @@ -189,7 +194,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", - "//vendor:k8s.io/apimachinery/pkg/util/json", "//vendor:k8s.io/apimachinery/pkg/util/net", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/sets", diff --git a/test/e2e/cluster_logging_es.go b/test/e2e/cluster_logging_es.go index 110952440f1..d9199454a11 100644 --- a/test/e2e/cluster_logging_es.go +++ b/test/e2e/cluster_logging_es.go @@ -17,24 +17,13 @@ limitations under the License. package e2e import ( - "context" - "encoding/json" "fmt" - "strconv" - "strings" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -const ( - // graceTime is how long to keep retrying requesting elasticsearch for status information. - graceTime = 5 * time.Minute ) var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() { @@ -48,241 +37,25 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu }) It("should check that logs from containers are ingested into Elasticsearch", func() { - err := checkElasticsearchReadiness(f) - framework.ExpectNoError(err, "Elasticsearch failed to start") + podName := "synthlogger" + esLogsProvider, err := newEsLogsProvider(f) + framework.ExpectNoError(err, "Failed to create GCL logs provider") + + err = esLogsProvider.EnsureWorking() + framework.ExpectNoError(err, "Elasticsearch is not working") By("Running synthetic logger") - createSynthLogger(f, expectedLinesCount) - defer f.PodClient().Delete(synthLoggerPodName, &metav1.DeleteOptions{}) - err = framework.WaitForPodSuccessInNamespace(f.ClientSet, synthLoggerPodName, f.Namespace.Name) - framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName)) + pod := createLoggingPod(f, podName, 100, 1*time.Second) + defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) + err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name) + framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName)) By("Waiting for logs to ingest") - totalMissing := expectedLinesCount - for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) { - totalMissing, err = getMissingLinesCountElasticsearch(f, expectedLinesCount) - if err != nil { - framework.Logf("Failed to get missing lines count due to %v", err) - totalMissing = expectedLinesCount - } else if totalMissing > 0 { - framework.Logf("Still missing %d lines", totalMissing) - } + err = waitForLogsIngestion(esLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0) + framework.ExpectNoError(err, "Failed to ingest logs") - if totalMissing == 0 { - break - } + if err != nil { + reportLogsFromFluentdPod(f, pod) } - - if totalMissing > 0 { - if err := reportLogsFromFluentdPod(f); err != nil { - framework.Logf("Failed to report logs from fluentd pod due to %v", err) - } - } - - Expect(totalMissing).To(Equal(0), "Some log lines are still missing") }) }) - -// Ensures that elasticsearch is running and ready to serve requests -func checkElasticsearchReadiness(f *framework.Framework) error { - // Check for the existence of the Elasticsearch service. - By("Checking the Elasticsearch service exists.") - s := f.ClientSet.Core().Services(metav1.NamespaceSystem) - // Make a few attempts to connect. This makes the test robust against - // being run as the first e2e test just after the e2e cluster has been created. - var err error - for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) { - if _, err = s.Get("elasticsearch-logging", metav1.GetOptions{}); err == nil { - break - } - framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start)) - } - Expect(err).NotTo(HaveOccurred()) - - // Wait for the Elasticsearch pods to enter the running state. - By("Checking to make sure the Elasticsearch pods are running") - label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "elasticsearch-logging"})) - options := metav1.ListOptions{LabelSelector: label.String()} - pods, err := f.ClientSet.Core().Pods(metav1.NamespaceSystem).List(options) - Expect(err).NotTo(HaveOccurred()) - for _, pod := range pods.Items { - err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod) - Expect(err).NotTo(HaveOccurred()) - } - - By("Checking to make sure we are talking to an Elasticsearch service.") - // Perform a few checks to make sure this looks like an Elasticsearch cluster. - var statusCode int - err = nil - var body []byte - for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) { - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) - if errProxy != nil { - framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) - continue - } - - ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) - defer cancel() - - // Query against the root URL for Elasticsearch. - response := proxyRequest.Namespace(metav1.NamespaceSystem). - Context(ctx). - Name("elasticsearch-logging"). - Do() - err = response.Error() - response.StatusCode(&statusCode) - - if err != nil { - if ctx.Err() != nil { - framework.Failf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err) - continue - } - framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err) - continue - } - if int(statusCode) != 200 { - framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode) - continue - } - break - } - Expect(err).NotTo(HaveOccurred()) - if int(statusCode) != 200 { - framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode) - } - - // Now assume we really are talking to an Elasticsearch instance. - // Check the cluster health. - By("Checking health of Elasticsearch service.") - healthy := false - for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) { - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) - if errProxy != nil { - framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) - continue - } - - ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) - defer cancel() - - body, err = proxyRequest.Namespace(metav1.NamespaceSystem). - Context(ctx). - Name("elasticsearch-logging"). - Suffix("_cluster/health"). - Param("level", "indices"). - DoRaw() - if err != nil { - if ctx.Err() != nil { - framework.Failf("Failed to get cluster health from elasticsearch: %v", err) - } - continue - } - health := make(map[string]interface{}) - err := json.Unmarshal(body, &health) - if err != nil { - framework.Logf("Bad json response from elasticsearch: %v", err) - continue - } - statusIntf, ok := health["status"] - if !ok { - framework.Logf("No status field found in cluster health response: %v", health) - continue - } - status := statusIntf.(string) - if status != "green" && status != "yellow" { - framework.Logf("Cluster health has bad status: %v", health) - continue - } - if err == nil && ok { - healthy = true - break - } - } - if !healthy { - return fmt.Errorf("After %v elasticsearch cluster is not healthy", graceTime) - } - - return nil -} - -func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int) (int, error) { - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) - if errProxy != nil { - return 0, fmt.Errorf("Failed to get services proxy request: %v", errProxy) - } - - ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) - defer cancel() - - // Ask Elasticsearch to return all the log lines that were tagged with the - // pod name. Ask for ten times as many log lines because duplication is possible. - body, err := proxyRequest.Namespace(metav1.NamespaceSystem). - Context(ctx). - Name("elasticsearch-logging"). - Suffix("_search"). - // TODO: Change filter to only match records from current test run - // after fluent-plugin-kubernetes_metadata_filter is enabled - // and optimize current query - Param("q", fmt.Sprintf("tag:*%s*", synthLoggerPodName)). - Param("size", strconv.Itoa(expectedCount*10)). - DoRaw() - if err != nil { - if ctx.Err() != nil { - framework.Failf("Failed to make proxy call to elasticsearch-logging: %v", err) - } - return 0, fmt.Errorf("Failed to make proxy call to elasticsearch-logging: %v", err) - } - - var response map[string]interface{} - err = json.Unmarshal(body, &response) - if err != nil { - return 0, fmt.Errorf("Failed to unmarshal response: %v", err) - } - - hits, ok := response["hits"].(map[string]interface{}) - if !ok { - return 0, fmt.Errorf("response[hits] not of the expected type: %T", response["hits"]) - } - - h, ok := hits["hits"].([]interface{}) - if !ok { - return 0, fmt.Errorf("Hits not of the expected type: %T", hits["hits"]) - } - - // Initialize data-structure for observing counts. - counts := make(map[int]int) - - // Iterate over the hits and populate the observed array. - for _, e := range h { - l, ok := e.(map[string]interface{}) - if !ok { - framework.Logf("Element of hit not of expected type: %T", e) - continue - } - source, ok := l["_source"].(map[string]interface{}) - if !ok { - framework.Logf("_source not of the expected type: %T", l["_source"]) - continue - } - msg, ok := source["log"].(string) - if !ok { - framework.Logf("Log not of the expected type: %T", source["log"]) - continue - } - lineNumber, err := strconv.Atoi(strings.TrimSpace(msg)) - if err != nil { - framework.Logf("Log line %s is not a number", msg) - continue - } - if lineNumber < 0 || lineNumber >= expectedCount { - framework.Logf("Number %d is not valid, expected number from range [0, %d)", lineNumber, expectedCount) - continue - } - // Record the observation of a log line - // Duplicates are possible and fine, fluentd has at-least-once delivery - counts[lineNumber]++ - } - - return expectedCount - len(counts), nil -} diff --git a/test/e2e/cluster_logging_es_utils.go b/test/e2e/cluster_logging_es_utils.go new file mode 100644 index 00000000000..e9704a6038e --- /dev/null +++ b/test/e2e/cluster_logging_es_utils.go @@ -0,0 +1,240 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "encoding/json" + "fmt" + "strconv" + "time" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + // esRetryTimeout is how long to keep retrying requesting elasticsearch for status information. + esRetryTimeout = 5 * time.Minute + // esRetryDelay is how much time to wait between two attempts to send a request to elasticsearch + esRetryDelay = 5 * time.Second +) + +type esLogsProvider struct { + Framework *framework.Framework +} + +func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) { + return &esLogsProvider{Framework: f}, nil +} + +// Ensures that elasticsearch is running and ready to serve requests +func (logsProvider *esLogsProvider) EnsureWorking() error { + f := logsProvider.Framework + // Check for the existence of the Elasticsearch service. + By("Checking the Elasticsearch service exists.") + s := f.ClientSet.Core().Services(api.NamespaceSystem) + // Make a few attempts to connect. This makes the test robust against + // being run as the first e2e test just after the e2e cluster has been created. + var err error + for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) { + if _, err = s.Get("elasticsearch-logging", meta_v1.GetOptions{}); err == nil { + break + } + framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start)) + } + Expect(err).NotTo(HaveOccurred()) + + // Wait for the Elasticsearch pods to enter the running state. + By("Checking to make sure the Elasticsearch pods are running") + labelSelector := fields.SelectorFromSet(fields.Set(map[string]string{"k8s-app": "elasticsearch-logging"})).String() + options := meta_v1.ListOptions{LabelSelector: labelSelector} + pods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options) + Expect(err).NotTo(HaveOccurred()) + for _, pod := range pods.Items { + err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod) + Expect(err).NotTo(HaveOccurred()) + } + + By("Checking to make sure we are talking to an Elasticsearch service.") + // Perform a few checks to make sure this looks like an Elasticsearch cluster. + var statusCode int + err = nil + var body []byte + for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) { + proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) + if errProxy != nil { + framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) + continue + } + // Query against the root URL for Elasticsearch. + response := proxyRequest.Namespace(api.NamespaceSystem). + Name("elasticsearch-logging"). + Do() + err = response.Error() + response.StatusCode(&statusCode) + + if err != nil { + framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err) + continue + } + if int(statusCode) != 200 { + framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode) + continue + } + break + } + Expect(err).NotTo(HaveOccurred()) + if int(statusCode) != 200 { + framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode) + } + + // Now assume we really are talking to an Elasticsearch instance. + // Check the cluster health. + By("Checking health of Elasticsearch service.") + healthy := false + for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) { + proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) + if errProxy != nil { + framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) + continue + } + body, err = proxyRequest.Namespace(api.NamespaceSystem). + Name("elasticsearch-logging"). + Suffix("_cluster/health"). + Param("level", "indices"). + DoRaw() + if err != nil { + continue + } + health := make(map[string]interface{}) + err := json.Unmarshal(body, &health) + if err != nil { + framework.Logf("Bad json response from elasticsearch: %v", err) + continue + } + statusIntf, ok := health["status"] + if !ok { + framework.Logf("No status field found in cluster health response: %v", health) + continue + } + status := statusIntf.(string) + if status != "green" && status != "yellow" { + framework.Logf("Cluster health has bad status: %v", health) + continue + } + if err == nil && ok { + healthy = true + break + } + } + if !healthy { + return fmt.Errorf("after %v elasticsearch cluster is not healthy", esRetryTimeout) + } + + return nil +} + +func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry { + f := logsProvider.Framework + + proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) + if errProxy != nil { + framework.Logf("Failed to get services proxy request: %v", errProxy) + return nil + } + + // Ask Elasticsearch to return all the log lines that were tagged with the + // pod name. Ask for ten times as many log lines because duplication is possible. + body, err := proxyRequest.Namespace(api.NamespaceSystem). + Name("elasticsearch-logging"). + Suffix("_search"). + // TODO: Change filter to only match records from current test run + // after fluent-plugin-kubernetes_metadata_filter is enabled + // and optimize current query + Param("q", fmt.Sprintf("tag:*%s*", pod.Name)). + // Ask for more in case we included some unrelated records in our query + Param("size", strconv.Itoa(pod.ExpectedLinesNumber*10)). + DoRaw() + if err != nil { + framework.Logf("Failed to make proxy call to elasticsearch-logging: %v", err) + return nil + } + + var response map[string]interface{} + err = json.Unmarshal(body, &response) + if err != nil { + framework.Logf("Failed to unmarshal response: %v", err) + return nil + } + + hits, ok := response["hits"].(map[string]interface{}) + if !ok { + framework.Logf("response[hits] not of the expected type: %T", response["hits"]) + return nil + } + + h, ok := hits["hits"].([]interface{}) + if !ok { + framework.Logf("Hits not of the expected type: %T", hits["hits"]) + return nil + } + + entries := []*logEntry{} + // Iterate over the hits and populate the observed array. + for _, e := range h { + l, ok := e.(map[string]interface{}) + if !ok { + framework.Logf("Element of hit not of expected type: %T", e) + continue + } + + source, ok := l["_source"].(map[string]interface{}) + if !ok { + framework.Logf("_source not of the expected type: %T", l["_source"]) + continue + } + + msg, ok := source["log"].(string) + if !ok { + framework.Logf("Log not of the expected type: %T", source["log"]) + continue + } + + timestampString, ok := source["@timestamp"].(string) + if !ok { + framework.Logf("Timestamp not of the expected type: %T", source["@timestamp"]) + continue + } + timestamp, err := time.Parse(time.RFC3339, timestampString) + if err != nil { + framework.Logf("Timestamp was not in correct format: %s", timestampString) + continue + } + + entries = append(entries, &logEntry{ + Payload: msg, + Timestamp: timestamp, + }) + } + + return entries +} diff --git a/test/e2e/cluster_logging_gcl.go b/test/e2e/cluster_logging_gcl.go index 71282aebbd3..a8fb1b71644 100644 --- a/test/e2e/cluster_logging_gcl.go +++ b/test/e2e/cluster_logging_gcl.go @@ -18,17 +18,12 @@ package e2e import ( "fmt" - "os/exec" - "strconv" - "strings" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/json" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" ) var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { @@ -39,97 +34,26 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { }) It("should check that logs from containers are ingested in GCL", func() { + podName := "synthlogger" + + gclLogsProvider, err := newGclLogsProvider(f) + framework.ExpectNoError(err, "Failed to create GCL logs provider") + + err = gclLogsProvider.EnsureWorking() + framework.ExpectNoError(err, "GCL is not working") + By("Running synthetic logger") - createSynthLogger(f, expectedLinesCount) - defer f.PodClient().Delete(synthLoggerPodName, &metav1.DeleteOptions{}) - err := framework.WaitForPodSuccessInNamespace(f.ClientSet, synthLoggerPodName, f.Namespace.Name) - framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName)) + pod := createLoggingPod(f, podName, 100, 1*time.Second) + defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) + err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name) + framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName)) By("Waiting for logs to ingest") - totalMissing := expectedLinesCount - for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) { - var err error - totalMissing, err = getMissingLinesCountGcl(f, synthLoggerPodName, expectedLinesCount) - if err != nil { - framework.Logf("Failed to get missing lines count due to %v", err) - totalMissing = expectedLinesCount - } else if totalMissing > 0 { - framework.Logf("Still missing %d lines", totalMissing) - } + err = waitForLogsIngestion(gclLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0) + framework.ExpectNoError(err, "Failed to ingest logs") - if totalMissing == 0 { - break - } + if err != nil { + reportLogsFromFluentdPod(f, pod) } - - if totalMissing > 0 { - if err := reportLogsFromFluentdPod(f); err != nil { - framework.Logf("Failed to report logs from fluentd pod due to %v", err) - } - } - - Expect(totalMissing).To(Equal(0), "Some log lines are still missing") }) }) - -func getMissingLinesCountGcl(f *framework.Framework, podName string, expectedCount int) (int, error) { - gclFilter := fmt.Sprintf("resource.labels.pod_id:%s AND resource.labels.namespace_id:%s", podName, f.Namespace.Name) - entries, err := readFilteredEntriesFromGcl(gclFilter) - if err != nil { - return 0, err - } - - occurrences := make(map[int]int) - for _, entry := range entries { - lineNumber, err := strconv.Atoi(strings.TrimSpace(entry)) - if err != nil { - continue - } - if lineNumber < 0 || lineNumber >= expectedCount { - framework.Logf("Unexpected line number: %d", lineNumber) - } else { - // Duplicates are possible and fine, fluentd has at-least-once delivery - occurrences[lineNumber]++ - } - } - - return expectedCount - len(occurrences), nil -} - -type LogEntry struct { - TextPayload string -} - -// Since GCL API is not easily available from the outside of cluster -// we use gcloud command to perform search with filter -func readFilteredEntriesFromGcl(filter string) ([]string, error) { - framework.Logf("Reading entries from GCL with filter '%v'", filter) - argList := []string{"beta", - "logging", - "read", - filter, - "--format", - "json", - "--project", - framework.TestContext.CloudConfig.ProjectID, - } - output, err := exec.Command("gcloud", argList...).CombinedOutput() - if err != nil { - return nil, err - } - - var entries []*LogEntry - if err = json.Unmarshal(output, &entries); err != nil { - return nil, err - } - framework.Logf("Read %d entries from GCL", len(entries)) - - var result []string - for _, entry := range entries { - if entry.TextPayload != "" { - result = append(result, entry.TextPayload) - } - } - - return result, nil -} diff --git a/test/e2e/cluster_logging_gcl_load.go b/test/e2e/cluster_logging_gcl_load.go new file mode 100644 index 00000000000..ccf647613dc --- /dev/null +++ b/test/e2e/cluster_logging_gcl_load.go @@ -0,0 +1,105 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "strconv" + "time" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" +) + +const ( + // TODO(crassirostris): Once test is stable, decrease allowed loses + loadTestMaxAllowedLostFraction = 0.1 +) + +var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]", func() { + f := framework.NewDefaultFramework("gcl-logging-load") + + It("should create a constant load with long-living pods and ensure logs delivery", func() { + gclLogsProvider, err := newGclLogsProvider(f) + framework.ExpectNoError(err, "Failed to create GCL logs provider") + + podCount := 30 + loggingDuration := 10 * time.Minute + linesPerSecond := 1000 + linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount + ingestionTimeout := 1 * time.Hour + + By("Running logs generator pods") + pods := []*loggingPod{} + for podIdx := 0; podIdx < podCount; podIdx++ { + podName := f.Namespace.Name + "-logs-generator-" + strconv.Itoa(linesPerPod) + "-" + strconv.Itoa(podIdx) + pods = append(pods, createLoggingPod(f, podName, linesPerPod, loggingDuration)) + + defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) + } + + By("Waiting for pods to succeed") + time.Sleep(loggingDuration) + + By("Waiting for all log lines to be ingested") + err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction) + if err != nil { + framework.Failf("Failed to ingest logs: %v", err) + } else { + framework.Logf("Successfully ingested all logs") + } + }) + + It("should create a constant load with short-living pods and ensure logs delivery", func() { + gclLogsProvider, err := newGclLogsProvider(f) + framework.ExpectNoError(err, "Failed to create GCL logs provider") + + maxPodCount := 10 + jobDuration := 1 * time.Minute + linesPerPodPerSecond := 10 + testDuration := 1 * time.Hour + ingestionTimeout := 1 * time.Hour + + podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) + podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1 + linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds()) + + By("Running short-living pods") + pods := []*loggingPod{} + for i := 0; i < podRunCount; i++ { + podName := f.Namespace.Name + "-job-logs-generator-" + + strconv.Itoa(maxPodCount) + "-" + strconv.Itoa(linesPerPod) + "-" + strconv.Itoa(i) + pods = append(pods, createLoggingPod(f, podName, linesPerPod, jobDuration)) + + defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) + + time.Sleep(podRunDelay) + } + + By("Waiting for the last pods to finish") + time.Sleep(jobDuration) + + By("Waiting for all log lines to be ingested") + err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction) + if err != nil { + framework.Failf("Failed to ingest logs: %v", err) + } else { + framework.Logf("Successfully ingested all logs") + } + }) +}) diff --git a/test/e2e/cluster_logging_gcl_utils.go b/test/e2e/cluster_logging_gcl_utils.go new file mode 100644 index 00000000000..c372691634a --- /dev/null +++ b/test/e2e/cluster_logging_gcl_utils.go @@ -0,0 +1,127 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "time" + + "k8s.io/kubernetes/test/e2e/framework" + + "golang.org/x/net/context" + "golang.org/x/oauth2/google" + gcl "google.golang.org/api/logging/v2beta1" +) + +const ( + // GCL doesn't support page size more than 1000 + gclPageSize = 1000 + + // If we failed to get response from GCL, it can be a random 500 or + // quota limit exceeded. So we retry for some time in case the problem will go away. + // Quota is enforced every 100 seconds, so we have to wait for more than + // that to reliably get the next portion. + queryGclRetryDelay = 10 * time.Second + queryGclRetryTimeout = 200 * time.Second +) + +type gclLogsProvider struct { + GclService *gcl.Service + Framework *framework.Framework +} + +func (gclLogsProvider *gclLogsProvider) EnsureWorking() error { + // We assume that GCL is always working + return nil +} + +func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { + ctx := context.Background() + hc, err := google.DefaultClient(ctx, gcl.CloudPlatformScope) + gclService, err := gcl.New(hc) + if err != nil { + return nil, err + } + + provider := &gclLogsProvider{ + GclService: gclService, + Framework: f, + } + return provider, nil +} + +// Since GCL API is not easily available from the outside of cluster +// we use gcloud command to perform search with filter +func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry { + filter := fmt.Sprintf("resource.labels.pod_id=%s AND resource.labels.namespace_id=%s AND timestamp>=\"%v\"", + pod.Name, gclLogsProvider.Framework.Namespace.Name, pod.LastTimestamp.Format(time.RFC3339)) + framework.Logf("Reading entries from GCL with filter '%v'", filter) + + response := getResponseSafe(gclLogsProvider.GclService, filter, "") + + var entries []*logEntry + for response != nil && len(response.Entries) > 0 { + framework.Logf("Received %d entries from GCL", len(response.Entries)) + + for _, entry := range response.Entries { + if entry.TextPayload == "" { + continue + } + + timestamp, parseErr := time.Parse(time.RFC3339, entry.Timestamp) + if parseErr != nil { + continue + } + + entries = append(entries, &logEntry{ + Timestamp: timestamp, + Payload: entry.TextPayload, + }) + } + + nextToken := response.NextPageToken + if nextToken == "" { + break + } + + response = getResponseSafe(gclLogsProvider.GclService, filter, response.NextPageToken) + } + + return entries +} + +func getResponseSafe(gclService *gcl.Service, filter string, pageToken string) *gcl.ListLogEntriesResponse { + for start := time.Now(); time.Since(start) < queryGclRetryTimeout; time.Sleep(queryGclRetryDelay) { + response, err := gclService.Entries.List(&gcl.ListLogEntriesRequest{ + ProjectIds: []string{ + framework.TestContext.CloudConfig.ProjectID, + }, + OrderBy: "timestamp desc", + Filter: filter, + PageSize: int64(gclPageSize), + PageToken: pageToken, + }).Do() + + if err == nil { + return response + } + + framework.Logf("Failed to get response from GCL due to %v, retrying", err) + } + + return nil +} diff --git a/test/e2e/cluster_logging_utils.go b/test/e2e/cluster_logging_utils.go index 62f7899b13f..36ac557a1bf 100644 --- a/test/e2e/cluster_logging_utils.go +++ b/test/e2e/cluster_logging_utils.go @@ -19,52 +19,202 @@ package e2e import ( "errors" "fmt" + "strconv" + "strings" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/api/resource" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/test/e2e/framework" ) const ( - // ingestionTimeout is how long to keep retrying to wait for all the - // logs to be ingested. - ingestionTimeout = 10 * time.Minute - // ingestionRetryDelay is how long test should wait between - // two attempts to check for ingestion - ingestionRetryDelay = 25 * time.Second + // Duration of delay between any two attempts to check if all logs are ingested + ingestionRetryDelay = 10 * time.Second - synthLoggerPodName = "synthlogger" + // Amount of requested cores for logging container in millicores + loggingContainerCpuRequest = 10 - // expectedLinesCount is the number of log lines emitted (and checked) for each synthetic logging pod. - expectedLinesCount = 100 + // Amount of requested memory for logging container in bytes + loggingContainerMemoryRequest = 10 * 1024 * 1024 ) -func createSynthLogger(f *framework.Framework, linesCount int) { - f.PodClient().Create(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: synthLoggerPodName, - Namespace: f.Namespace.Name, +// Type to track the progress of logs generating pod +type loggingPod struct { + // Name of the pod + Name string + // If we didn't read some log entries, their + // timestamps should be no less than this timestamp. + // Effectively, timestamp of the last ingested entry + // for which there's no missing entry before it + LastTimestamp time.Time + // Cache of ingested and read entries + Occurrences map[int]*logEntry + // Number of lines expected to be ingested from this pod + ExpectedLinesNumber int +} + +type logEntry struct { + Payload string + Timestamp time.Time +} + +type logsProvider interface { + EnsureWorking() error + ReadEntries(*loggingPod) []*logEntry +} + +func (entry *logEntry) getLogEntryNumber() (int, bool) { + chunks := strings.Split(entry.Payload, " ") + lineNumber, err := strconv.Atoi(strings.TrimSpace(chunks[0])) + return lineNumber, err == nil +} + +func createLoggingPod(f *framework.Framework, podName string, totalLines int, loggingDuration time.Duration) *loggingPod { + framework.Logf("Starting pod %s", podName) + createLogsGeneratorPod(f, podName, totalLines, loggingDuration) + + return &loggingPod{ + Name: podName, + // It's used to avoid querying logs from before the pod was started + LastTimestamp: time.Now(), + Occurrences: make(map[int]*logEntry), + ExpectedLinesNumber: totalLines, + } +} + +func createLogsGeneratorPod(f *framework.Framework, podName string, linesCount int, duration time.Duration) { + f.PodClient().Create(&api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: podName, }, - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyOnFailure, - Containers: []v1.Container{ + Spec: api_v1.PodSpec{ + RestartPolicy: api_v1.RestartPolicyNever, + Containers: []api_v1.Container{ { - Name: synthLoggerPodName, - Image: "gcr.io/google_containers/busybox:1.24", - // notice: the subshell syntax is escaped with `$$` - Command: []string{"/bin/sh", "-c", fmt.Sprintf("i=0; while [ $i -lt %d ]; do echo $i; i=`expr $i + 1`; done", linesCount)}, + Name: podName, + Image: "gcr.io/google_containers/logs-generator:v0.1.0", + Env: []api_v1.EnvVar{ + { + Name: "LOGS_GENERATOR_LINES_TOTAL", + Value: strconv.Itoa(linesCount), + }, + { + Name: "LOGS_GENERATOR_DURATION", + Value: duration.String(), + }, + }, + Resources: api_v1.ResourceRequirements{ + Requests: api_v1.ResourceList{ + api_v1.ResourceCPU: *resource.NewMilliQuantity( + loggingContainerCpuRequest, + resource.DecimalSI), + api_v1.ResourceMemory: *resource.NewQuantity( + loggingContainerMemoryRequest, + resource.BinarySI), + }, + }, }, }, }, }) } -func reportLogsFromFluentdPod(f *framework.Framework) error { - synthLoggerPod, err := f.PodClient().Get(synthLoggerPodName, metav1.GetOptions{}) +func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingestionTimeout time.Duration, maxAllowedLostFraction float64) error { + expectedLinesNumber := 0 + for _, pod := range pods { + expectedLinesNumber += pod.ExpectedLinesNumber + } + + totalMissing := expectedLinesNumber + + missingByPod := make([]int, len(pods)) + for podIdx, pod := range pods { + missingByPod[podIdx] = pod.ExpectedLinesNumber + } + + for start := time.Now(); totalMissing > 0 && time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) { + missing := 0 + for podIdx, pod := range pods { + if missingByPod[podIdx] == 0 { + continue + } + + missingByPod[podIdx] = pullMissingLogsCount(logsProvider, pod) + missing += missingByPod[podIdx] + } + + totalMissing = missing + if totalMissing > 0 { + framework.Logf("Still missing %d lines in total", totalMissing) + } + } + + lostFraction := float64(totalMissing) / float64(expectedLinesNumber) + + if totalMissing > 0 { + framework.Logf("After %v still missing %d lines, %.2f%% of total number oflines", + ingestionTimeout, totalMissing, lostFraction*100) + } + + if lostFraction > maxAllowedLostFraction { + return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated", + lostFraction*100, maxAllowedLostFraction*100) + } + + return nil +} + +func pullMissingLogsCount(logsProvider logsProvider, pod *loggingPod) int { + missingOnPod, err := getMissingLinesCount(logsProvider, pod) if err != nil { - return fmt.Errorf("Failed to get synth logger pod due to %v", err) + framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err) + return pod.ExpectedLinesNumber + } else if missingOnPod > 0 { + framework.Logf("Pod %s is missing %d lines", pod.Name, missingOnPod) + } else { + framework.Logf("All logs from pod %s are ingested", pod.Name) + } + return missingOnPod +} + +func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) { + entries := logsProvider.ReadEntries(pod) + + for _, entry := range entries { + lineNumber, ok := entry.getLogEntryNumber() + if !ok { + continue + } + + if lineNumber < 0 || lineNumber >= pod.ExpectedLinesNumber { + framework.Logf("Unexpected line number: %d", lineNumber) + } else { + pod.Occurrences[lineNumber] = entry + } + } + + for i := 0; i < pod.ExpectedLinesNumber; i++ { + entry, ok := pod.Occurrences[i] + if !ok { + break + } + + if entry.Timestamp.After(pod.LastTimestamp) { + pod.LastTimestamp = entry.Timestamp + } + } + + return pod.ExpectedLinesNumber - len(pod.Occurrences), nil +} + +func reportLogsFromFluentdPod(f *framework.Framework, pod *loggingPod) error { + synthLoggerPod, err := f.PodClient().Get(pod.Name, meta_v1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get synth logger pod due to %v", err) } synthLoggerNodeName := synthLoggerPod.Spec.NodeName @@ -73,20 +223,20 @@ func reportLogsFromFluentdPod(f *framework.Framework) error { } label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "fluentd-logging"})) - options := metav1.ListOptions{LabelSelector: label.String()} - fluentdPods, err := f.ClientSet.Core().Pods(metav1.NamespaceSystem).List(options) + options := meta_v1.ListOptions{LabelSelector: label.String()} + fluentdPods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options) for _, fluentdPod := range fluentdPods.Items { if fluentdPod.Spec.NodeName == synthLoggerNodeName { containerName := fluentdPod.Spec.Containers[0].Name - logs, err := framework.GetPodLogs(f.ClientSet, metav1.NamespaceSystem, fluentdPod.Name, containerName) + logs, err := framework.GetPodLogs(f.ClientSet, meta_v1.NamespaceSystem, fluentdPod.Name, containerName) if err != nil { - return fmt.Errorf("Failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err) + return fmt.Errorf("failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err) } framework.Logf("Logs from fluentd pod %s:\n%s", fluentdPod.Name, logs) return nil } } - return fmt.Errorf("Failed to find fluentd pod running on node %s", synthLoggerNodeName) + return fmt.Errorf("failed to find fluentd pod running on node %s", synthLoggerNodeName) } diff --git a/test/test_owners.csv b/test/test_owners.csv index 47fc9c8157e..950bda90dda 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -11,6 +11,8 @@ Cassandra should create and scale cassandra,fabioy,1,apps, CassandraStatefulSet should create statefulset,wojtek-t,1,apps, Cluster level logging using Elasticsearch should check that logs from containers are ingested into Elasticsearch,crassirostris,0,instrumentation, Cluster level logging using GCL should check that logs from containers are ingested in GCL,crassirostris,0,instrumentation, +Cluster level logging using GCL should create a constant load with long-living pods and ensure logs delivery,crassirostris,0,instrumentation, +Cluster level logging using GCL should create a constant load with short-living pods and ensure logs delivery,crassirostris,0,instrumentation, Cluster size autoscaling should add node to the particular mig,spxtr,1,autoscaling, Cluster size autoscaling should correctly scale down after a node is not needed,pmorie,1,autoscaling, Cluster size autoscaling should correctly scale down after a node is not needed when there is non autoscaled pool,krousey,1,autoscaling,