diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index 8555b4ba32c..1e32dac84fd 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -122,7 +122,7 @@ KUBEPROXY_TEST_ARGS="${KUBEPROXY_TEST_ARGS:-} ${TEST_CLUSTER_API_CONTENT_TYPE}" # Optional: Enable node logging. ENABLE_NODE_LOGGING="${KUBE_ENABLE_NODE_LOGGING:-true}" -LOGGING_DESTINATION="${KUBE_LOGGING_DESTINATION:-elasticsearch}" # options: elasticsearch, gcp +LOGGING_DESTINATION="${KUBE_LOGGING_DESTINATION:-gcp}" # options: elasticsearch, gcp # Optional: When set to true, Elasticsearch and Kibana will be setup as part of the cluster bring up. ENABLE_CLUSTER_LOGGING="${KUBE_ENABLE_CLUSTER_LOGGING:-true}" diff --git a/hack/verify-flags/exceptions.txt b/hack/verify-flags/exceptions.txt index 1726ca1a883..f3222df605f 100644 --- a/hack/verify-flags/exceptions.txt +++ b/hack/verify-flags/exceptions.txt @@ -119,9 +119,9 @@ test/e2e/common/host_path.go: fmt.Sprintf("--file_content_in_loop=%v", filePat test/e2e/common/host_path.go: fmt.Sprintf("--file_content_in_loop=%v", filePathInReader), test/e2e/common/host_path.go: fmt.Sprintf("--retry_time=%d", retryDuration), test/e2e/common/host_path.go: fmt.Sprintf("--retry_time=%d", retryDuration), -test/e2e/es_cluster_logging.go: framework.Failf("No cluster_name field in Elasticsearch response: %v", esResponse) -test/e2e/es_cluster_logging.go: // Check to see if have a cluster_name field. -test/e2e/es_cluster_logging.go: clusterName, ok := esResponse["cluster_name"] +test/e2e/cluster_logging_es.go: return fmt.Errorf("No cluster_name field in Elasticsearch response: %v", esResponse) +test/e2e/cluster_logging_es.go: // Check to see if have a cluster_name field. +test/e2e/cluster_logging_es.go: clusterName, ok := esResponse["cluster_name"] test/e2e_node/container_manager_test.go: return fmt.Errorf("expected pid %d's oom_score_adj to be %d; found %d", pid, expectedOOMScoreAdj, oomScore) test/e2e_node/container_manager_test.go: return fmt.Errorf("expected pid %d's oom_score_adj to be < %d; found %d", pid, expectedMaxOOMScoreAdj, oomScore) test/e2e_node/container_manager_test.go: return fmt.Errorf("expected pid %d's oom_score_adj to be >= %d; found %d", pid, expectedMinOOMScoreAdj, oomScore) diff --git a/test/e2e/cluster_logging_es.go b/test/e2e/cluster_logging_es.go new file mode 100644 index 00000000000..40847b2c7f9 --- /dev/null +++ b/test/e2e/cluster_logging_es.go @@ -0,0 +1,288 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + // graceTime is how long to keep retrying requesting elasticsearch for status information. + graceTime = 5 * time.Minute +) + +var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() { + f := framework.NewDefaultFramework("es-logging") + + BeforeEach(func() { + // TODO: For now assume we are only testing cluster logging with Elasticsearch + // on GCE. Once we are sure that Elasticsearch cluster level logging + // works for other providers we should widen this scope of this test. + framework.SkipUnlessProviderIs("gce") + }) + + It("should check that logs from containers are ingested into Elasticsearch", func() { + err := checkElasticsearchReadiness(f) + framework.ExpectNoError(err, "Elasticsearch failed to start") + + By("Running synthetic logger") + createSynthLogger(f, expectedLinesCount) + defer f.PodClient().Delete(synthLoggerPodName, &api.DeleteOptions{}) + err = framework.WaitForPodSuccessInNamespace(f.Client, synthLoggerPodName, synthLoggerPodName, f.Namespace.Name) + framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName)) + + By("Waiting for logs to ingest") + totalMissing := expectedLinesCount + for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) { + totalMissing, err = getMissingLinesCountElasticsearch(f, expectedLinesCount) + if err != nil { + framework.Logf("Failed to get missing lines count due to %v", err) + totalMissing = expectedLinesCount + } else if totalMissing > 0 { + framework.Logf("Still missing %d lines", totalMissing) + } + + if totalMissing == 0 { + break + } + } + + Expect(totalMissing).To(Equal(0), "Some log lines are still missing") + }) +}) + +// Ensures that elasticsearch is running and ready to serve requests +func checkElasticsearchReadiness(f *framework.Framework) error { + // Check for the existence of the Elasticsearch service. + By("Checking the Elasticsearch service exists.") + s := f.Client.Services(api.NamespaceSystem) + // Make a few attempts to connect. This makes the test robust against + // being run as the first e2e test just after the e2e cluster has been created. + var err error + for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) { + if _, err = s.Get("elasticsearch-logging"); err == nil { + break + } + framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start)) + } + Expect(err).NotTo(HaveOccurred()) + + // Wait for the Elasticsearch pods to enter the running state. + By("Checking to make sure the Elasticsearch pods are running") + label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "elasticsearch-logging"})) + options := api.ListOptions{LabelSelector: label} + pods, err := f.Client.Pods(api.NamespaceSystem).List(options) + Expect(err).NotTo(HaveOccurred()) + for _, pod := range pods.Items { + err = framework.WaitForPodRunningInNamespace(f.Client, &pod) + Expect(err).NotTo(HaveOccurred()) + } + + By("Checking to make sure we are talking to an Elasticsearch service.") + // Perform a few checks to make sure this looks like an Elasticsearch cluster. + var statusCode float64 + var esResponse map[string]interface{} + err = nil + var body []byte + for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) { + proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get()) + if errProxy != nil { + framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) + continue + } + // Query against the root URL for Elasticsearch. + body, err = proxyRequest.Namespace(api.NamespaceSystem). + Name("elasticsearch-logging"). + DoRaw() + if err != nil { + framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err) + continue + } + err = json.Unmarshal(body, &esResponse) + if err != nil { + framework.Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err) + continue + } + statusIntf, ok := esResponse["status"] + if !ok { + framework.Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse) + continue + } + statusCode, ok = statusIntf.(float64) + if !ok { + // Assume this is a string returning Failure. Retry. + framework.Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf) + continue + } + if int(statusCode) != 200 { + framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode) + continue + } + break + } + + if err != nil { + return err + } + + if int(statusCode) != 200 { + return fmt.Errorf("Elasticsearch cluster has a bad status: %v", statusCode) + } + + // Check to see if have a cluster_name field. + clusterName, ok := esResponse["cluster_name"] + if !ok { + return fmt.Errorf("No cluster_name field in Elasticsearch response: %v", esResponse) + } + + if clusterName != "kubernetes-logging" { + return fmt.Errorf("Connected to wrong cluster %q (expecting kubernetes_logging)", clusterName) + } + + // Now assume we really are talking to an Elasticsearch instance. + // Check the cluster health. + By("Checking health of Elasticsearch service.") + healthy := false + for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) { + proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get()) + if errProxy != nil { + framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) + continue + } + body, err = proxyRequest.Namespace(api.NamespaceSystem). + Name("elasticsearch-logging"). + Suffix("_cluster/health"). + Param("level", "indices"). + DoRaw() + if err != nil { + continue + } + + var health map[string]interface{} + err := json.Unmarshal(body, &health) + if err != nil { + framework.Logf("Bad json response from elasticsearch: %v", err) + continue + } + statusIntf, ok := health["status"] + if !ok { + framework.Logf("No status field found in cluster health response: %v", health) + continue + } + status := statusIntf.(string) + if status != "green" && status != "yellow" { + framework.Logf("Cluster health has bad status: %v", health) + continue + } + if err == nil && ok { + healthy = true + break + } + } + + if !healthy { + return fmt.Errorf("After %v elasticsearch cluster is not healthy", graceTime) + } + + return nil +} + +func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int) (int, error) { + proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get()) + if errProxy != nil { + return 0, fmt.Errorf("Failed to get services proxy request: %v", errProxy) + } + + // Ask Elasticsearch to return all the log lines that were tagged with the underscore + // version of the name. Ask for twice as many log lines as we expect to check for + // duplication bugs. + body, err := proxyRequest.Namespace(api.NamespaceSystem). + Name("elasticsearch-logging"). + Suffix("_search"). + // TODO: Change filter to only match records from current test run + // after fluent-plugin-kubernetes_metadata_filter is enabled + // and optimize current query + Param("q", "tag:*synthlogger*"). + Param("size", strconv.Itoa(expectedCount)). + DoRaw() + if err != nil { + return 0, fmt.Errorf("Failed to make proxy call to elasticsearch-logging: %v", err) + } + + var response map[string]interface{} + err = json.Unmarshal(body, &response) + if err != nil { + return 0, fmt.Errorf("Failed to unmarshal response: %v", err) + } + + hits, ok := response["hits"].(map[string]interface{}) + if !ok { + return 0, fmt.Errorf("response[hits] not of the expected type: %T", response["hits"]) + } + + h, ok := hits["hits"].([]interface{}) + if !ok { + return 0, fmt.Errorf("Hits not of the expected type: %T", hits["hits"]) + } + + // Initialize data-structure for observing counts. + counts := make(map[int]int) + + // Iterate over the hits and populate the observed array. + for _, e := range h { + l, ok := e.(map[string]interface{}) + if !ok { + framework.Logf("Element of hit not of expected type: %T", e) + continue + } + source, ok := l["_source"].(map[string]interface{}) + if !ok { + framework.Logf("_source not of the expected type: %T", l["_source"]) + continue + } + msg, ok := source["log"].(string) + if !ok { + framework.Logf("Log not of the expected type: %T", source["log"]) + continue + } + lineNumber, err := strconv.Atoi(strings.TrimSpace(msg)) + if err != nil { + framework.Logf("Log line %s is not a number", msg) + continue + } + if lineNumber < 0 || lineNumber >= expectedCount { + framework.Logf("Number %d is not valid, expected number from range [0, %d)", lineNumber, expectedCount) + continue + } + // Record the observation of a log line + // Duplicates are possible and fine, fluentd has at-least-once delivery + counts[lineNumber]++ + } + + return expectedCount - len(counts), nil +} diff --git a/test/e2e/cluster_logging_gcl.go b/test/e2e/cluster_logging_gcl.go new file mode 100644 index 00000000000..fa708f9f151 --- /dev/null +++ b/test/e2e/cluster_logging_gcl.go @@ -0,0 +1,130 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "os/exec" + "strconv" + "strings" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/json" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = framework.KubeDescribe("Cluster level logging using GCL", func() { + f := framework.NewDefaultFramework("gcl-logging") + + BeforeEach(func() { + // TODO (crassirostris): Expand to GKE once the test is stable + framework.SkipUnlessProviderIs("gce") + }) + + It("should check that logs from containers are ingested in GCL", func() { + By("Running synthetic logger") + createSynthLogger(f, expectedLinesCount) + defer f.PodClient().Delete(synthLoggerPodName, &api.DeleteOptions{}) + err := framework.WaitForPodSuccessInNamespace(f.Client, synthLoggerPodName, synthLoggerPodName, f.Namespace.Name) + framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName)) + + By("Waiting for logs to ingest") + totalMissing := expectedLinesCount + for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) { + var err error + totalMissing, err = getMissingLinesCountGcl(f, synthLoggerPodName, expectedLinesCount) + if err != nil { + framework.Logf("Failed to get missing lines count due to %v", err) + totalMissing = expectedLinesCount + } else if totalMissing > 0 { + framework.Logf("Still missing %d lines", totalMissing) + } + + if totalMissing == 0 { + break + } + } + + Expect(totalMissing).To(Equal(0), "Some log lines are still missing") + }) +}) + +func getMissingLinesCountGcl(f *framework.Framework, podName string, expectedCount int) (int, error) { + gclFilter := fmt.Sprintf("resource.labels.pod_id:%s AND resource.labels.namespace_id:%s", podName, f.Namespace.Name) + entries, err := readFilteredEntriesFromGcl(gclFilter) + if err != nil { + return 0, err + } + + occurrences := make(map[int]int) + for _, entry := range entries { + lineNumber, err := strconv.Atoi(strings.TrimSpace(entry)) + if err != nil { + continue + } + if lineNumber < 0 || lineNumber >= expectedCount { + framework.Logf("Unexpected line number: %d", lineNumber) + } else { + // Duplicates are possible and fine, fluentd has at-least-once delivery + occurrences[lineNumber]++ + } + } + + return expectedCount - len(occurrences), nil +} + +type LogEntry struct { + TextPayload string +} + +// Since GCL API is not easily available from the outside of cluster +// we use gcloud command to perform search with filter +func readFilteredEntriesFromGcl(filter string) ([]string, error) { + framework.Logf("Reading entries from GCL with filter '%v'", filter) + argList := []string{"beta", + "logging", + "read", + filter, + "--format", + "json", + "--project", + framework.TestContext.CloudConfig.ProjectID, + } + output, err := exec.Command("gcloud", argList...).CombinedOutput() + if err != nil { + return nil, err + } + + var entries []*LogEntry + if err = json.Unmarshal(output, &entries); err != nil { + return nil, err + } + framework.Logf("Read %d entries from GCL", len(entries)) + + var result []string + for _, entry := range entries { + if entry.TextPayload != "" { + result = append(result, entry.TextPayload) + } + } + + return result, nil +} diff --git a/test/e2e/cluster_logging_utils.go b/test/e2e/cluster_logging_utils.go new file mode 100644 index 00000000000..7f001742553 --- /dev/null +++ b/test/e2e/cluster_logging_utils.go @@ -0,0 +1,58 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/test/e2e/framework" +) + +const ( + // ingestionTimeout is how long to keep retrying to wait for all the + // logs to be ingested. + ingestionTimeout = 10 * time.Minute + // ingestionRetryDelay is how long test should wait between + // two attempts to check for ingestion + ingestionRetryDelay = 25 * time.Second + + synthLoggerPodName = "synthlogger" + + // expectedLinesCount is the number of log lines emitted (and checked) for each synthetic logging pod. + expectedLinesCount = 100 +) + +func createSynthLogger(f *framework.Framework, linesCount int) { + f.PodClient().Create(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: synthLoggerPodName, + Namespace: f.Namespace.Name, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: synthLoggerPodName, + Image: "gcr.io/google_containers/busybox:1.24", + // notice: the subshell syntax is escaped with `$$` + Command: []string{"/bin/sh", "-c", fmt.Sprintf("i=0; while [ $i -lt %d ]; do echo $i; i=`expr $i + 1`; done", linesCount)}, + }, + }, + }, + }) +} diff --git a/test/e2e/es_cluster_logging.go b/test/e2e/es_cluster_logging.go deleted file mode 100644 index 3f6d637f7ef..00000000000 --- a/test/e2e/es_cluster_logging.go +++ /dev/null @@ -1,473 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package e2e - -import ( - "encoding/json" - "fmt" - "strconv" - "strings" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/test/e2e/framework" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() { - f := framework.NewDefaultFramework("es-logging") - - BeforeEach(func() { - // TODO: For now assume we are only testing cluster logging with Elasticsearch - // on GCE. Once we are sure that Elasticsearch cluster level logging - // works for other providers we should widen this scope of this test. - framework.SkipUnlessProviderIs("gce") - }) - - It("should check that logs from pods on all nodes are ingested into Elasticsearch", func() { - ClusterLevelLoggingWithElasticsearch(f) - }) -}) - -const ( - k8sAppKey = "k8s-app" - esValue = "elasticsearch-logging" - fluentdValue = "fluentd-logging" -) - -func bodyToJSON(body []byte) (map[string]interface{}, error) { - var r map[string]interface{} - if err := json.Unmarshal(body, &r); err != nil { - framework.Logf("Bad JSON: %s", string(body)) - return nil, fmt.Errorf("failed to unmarshal Elasticsearch response: %v", err) - } - return r, nil -} - -func nodeInNodeList(nodeName string, nodeList *api.NodeList) bool { - for _, node := range nodeList.Items { - if nodeName == node.Name { - return true - } - } - return false -} - -// ClusterLevelLoggingWithElasticsearch is an end to end test for cluster level logging. -func ClusterLevelLoggingWithElasticsearch(f *framework.Framework) { - // graceTime is how long to keep retrying requests for status information. - const graceTime = 5 * time.Minute - // ingestionTimeout is how long to keep retrying to wait for all the - // logs to be ingested. - const ingestionTimeout = 10 * time.Minute - - // Check for the existence of the Elasticsearch service. - By("Checking the Elasticsearch service exists.") - s := f.Client.Services(api.NamespaceSystem) - // Make a few attempts to connect. This makes the test robust against - // being run as the first e2e test just after the e2e cluster has been created. - var err error - for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) { - if _, err = s.Get("elasticsearch-logging"); err == nil { - break - } - framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start)) - } - Expect(err).NotTo(HaveOccurred()) - - // Wait for the Elasticsearch pods to enter the running state. - By("Checking to make sure the Elasticsearch pods are running") - label := labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: esValue})) - options := api.ListOptions{LabelSelector: label} - pods, err := f.Client.Pods(api.NamespaceSystem).List(options) - Expect(err).NotTo(HaveOccurred()) - for _, pod := range pods.Items { - err = framework.WaitForPodRunningInNamespace(f.Client, &pod) - Expect(err).NotTo(HaveOccurred()) - } - - By("Checking to make sure we are talking to an Elasticsearch service.") - // Perform a few checks to make sure this looks like an Elasticsearch cluster. - var statusCode float64 - var esResponse map[string]interface{} - err = nil - var body []byte - for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) { - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get()) - if errProxy != nil { - framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) - continue - } - // Query against the root URL for Elasticsearch. - body, err = proxyRequest.Namespace(api.NamespaceSystem). - Name("elasticsearch-logging"). - DoRaw() - if err != nil { - framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err) - continue - } - esResponse, err = bodyToJSON(body) - if err != nil { - framework.Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err) - continue - } - statusIntf, ok := esResponse["status"] - if !ok { - framework.Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse) - continue - } - statusCode, ok = statusIntf.(float64) - if !ok { - // Assume this is a string returning Failure. Retry. - framework.Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf) - continue - } - if int(statusCode) != 200 { - framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode) - continue - } - break - } - Expect(err).NotTo(HaveOccurred()) - if int(statusCode) != 200 { - framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode) - } - // Check to see if have a cluster_name field. - clusterName, ok := esResponse["cluster_name"] - if !ok { - framework.Failf("No cluster_name field in Elasticsearch response: %v", esResponse) - } - if clusterName != "kubernetes-logging" { - framework.Failf("Connected to wrong cluster %q (expecting kubernetes_logging)", clusterName) - } - - // Now assume we really are talking to an Elasticsearch instance. - // Check the cluster health. - By("Checking health of Elasticsearch service.") - healthy := false - for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) { - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get()) - if errProxy != nil { - framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) - continue - } - body, err = proxyRequest.Namespace(api.NamespaceSystem). - Name("elasticsearch-logging"). - Suffix("_cluster/health"). - Param("level", "indices"). - DoRaw() - if err != nil { - continue - } - health, err := bodyToJSON(body) - if err != nil { - framework.Logf("Bad json response from elasticsearch: %v", err) - continue - } - statusIntf, ok := health["status"] - if !ok { - framework.Logf("No status field found in cluster health response: %v", health) - continue - } - status := statusIntf.(string) - if status != "green" && status != "yellow" { - framework.Logf("Cluster health has bad status: %v", health) - continue - } - if err == nil && ok { - healthy = true - break - } - } - if !healthy { - framework.Failf("After %v elasticsearch cluster is not healthy", graceTime) - } - - // Obtain a list of nodes so we can place one synthetic logger on each node. - nodes := framework.GetReadySchedulableNodesOrDie(f.Client) - nodeCount := len(nodes.Items) - if nodeCount == 0 { - framework.Failf("Failed to find any nodes") - } - framework.Logf("Found %d nodes.", len(nodes.Items)) - - // Filter out unhealthy nodes. - // Previous tests may have cause failures of some nodes. Let's skip - // 'Not Ready' nodes, just in case (there is no need to fail the test). - framework.FilterNodes(nodes, func(node api.Node) bool { - return framework.IsNodeConditionSetAsExpected(&node, api.NodeReady, true) - }) - if len(nodes.Items) < 2 { - framework.Failf("Less than two nodes were found Ready: %d", len(nodes.Items)) - } - framework.Logf("Found %d healthy nodes.", len(nodes.Items)) - - // Wait for the Fluentd pods to enter the running state. - By("Checking to make sure the Fluentd pod are running on each healthy node") - label = labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: fluentdValue})) - options = api.ListOptions{LabelSelector: label} - fluentdPods, err := f.Client.Pods(api.NamespaceSystem).List(options) - Expect(err).NotTo(HaveOccurred()) - for _, pod := range fluentdPods.Items { - if nodeInNodeList(pod.Spec.NodeName, nodes) { - err = framework.WaitForPodRunningInNamespace(f.Client, &pod) - Expect(err).NotTo(HaveOccurred()) - } - } - - // Check if each healthy node has fluentd running on it - for _, node := range nodes.Items { - exists := false - for _, pod := range fluentdPods.Items { - if pod.Spec.NodeName == node.Name { - exists = true - break - } - } - if !exists { - framework.Failf("Node %v does not have fluentd pod running on it.", node.Name) - } - } - - // Create a unique root name for the resources in this test to permit - // parallel executions of this test. - // Use a unique namespace for the resources created in this test. - ns := f.Namespace.Name - name := "synthlogger" - // Form a unique name to taint log lines to be collected. - // Replace '-' characters with '_' to prevent the analyzer from breaking apart names. - taintName := strings.Replace(ns+name, "-", "_", -1) - framework.Logf("Tainting log lines with %v", taintName) - // podNames records the names of the synthetic logging pods that are created in the - // loop below. - var podNames []string - // countTo is the number of log lines emitted (and checked) for each synthetic logging pod. - const countTo = 100 - // Instantiate a synthetic logger pod on each node. - for i, node := range nodes.Items { - podName := fmt.Sprintf("%s-%d", name, i) - _, err := f.Client.Pods(ns).Create(&api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: podName, - Labels: map[string]string{"name": name}, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "synth-logger", - Image: "gcr.io/google_containers/ubuntu:14.04", - // notice: the subshell syntax is escaped with `$$` - Command: []string{"bash", "-c", fmt.Sprintf("i=0; while ((i < %d)); do echo \"%d %s $i %s\"; i=$$(($i+1)); done", countTo, i, taintName, podName)}, - }, - }, - NodeName: node.Name, - RestartPolicy: api.RestartPolicyNever, - }, - }) - Expect(err).NotTo(HaveOccurred()) - podNames = append(podNames, podName) - } - - // Cleanup the pods when we are done. - defer func() { - for _, pod := range podNames { - if err = f.Client.Pods(ns).Delete(pod, nil); err != nil { - framework.Logf("Failed to delete pod %s: %v", pod, err) - } - } - }() - - // Wait for the synthetic logging pods to finish. - By("Waiting for the pods to succeed.") - for _, pod := range podNames { - err = framework.WaitForPodSuccessInNamespace(f.Client, pod, "synth-logger", ns) - Expect(err).NotTo(HaveOccurred()) - } - - // Make several attempts to observe the logs ingested into Elasticsearch. - By("Checking all the log lines were ingested into Elasticsearch") - totalMissing := 0 - expected := nodeCount * countTo - missingPerNode := []int{} - for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(25 * time.Second) { - - // Debugging code to report the status of the elasticsearch logging endpoints. - selector := labels.Set{k8sAppKey: esValue}.AsSelector() - options := api.ListOptions{LabelSelector: selector} - esPods, err := f.Client.Pods(api.NamespaceSystem).List(options) - if err != nil { - framework.Logf("Attempt to list Elasticsearch nodes encountered a problem -- may retry: %v", err) - continue - } else { - for i, pod := range esPods.Items { - framework.Logf("pod %d: %s PodIP %s phase %s condition %+v", i, pod.Name, pod.Status.PodIP, pod.Status.Phase, - pod.Status.Conditions) - } - } - - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get()) - if errProxy != nil { - framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) - continue - } - // Ask Elasticsearch to return all the log lines that were tagged with the underscore - // version of the name. Ask for twice as many log lines as we expect to check for - // duplication bugs. - body, err = proxyRequest.Namespace(api.NamespaceSystem). - Name("elasticsearch-logging"). - Suffix("_search"). - Param("q", fmt.Sprintf("log:%s", taintName)). - Param("size", strconv.Itoa(2*expected)). - DoRaw() - if err != nil { - framework.Logf("After %v failed to make proxy call to elasticsearch-logging: %v", time.Since(start), err) - continue - } - - response, err := bodyToJSON(body) - if err != nil { - framework.Logf("After %v failed to unmarshal response: %v", time.Since(start), err) - framework.Logf("Body: %s", string(body)) - continue - } - hits, ok := response["hits"].(map[string]interface{}) - if !ok { - framework.Logf("response[hits] not of the expected type: %T", response["hits"]) - continue - } - totalF, ok := hits["total"].(float64) - if !ok { - framework.Logf("After %v hits[total] not of the expected type: %T", time.Since(start), hits["total"]) - continue - } - total := int(totalF) - if total != expected { - framework.Logf("After %v expecting to find %d log lines but saw %d", time.Since(start), expected, total) - } - h, ok := hits["hits"].([]interface{}) - if !ok { - framework.Logf("After %v hits not of the expected type: %T", time.Since(start), hits["hits"]) - continue - } - // Initialize data-structure for observing counts. - observed := make([][]int, nodeCount) - for i := range observed { - observed[i] = make([]int, countTo) - } - // Iterate over the hits and populate the observed array. - for _, e := range h { - l, ok := e.(map[string]interface{}) - if !ok { - framework.Logf("element of hit not of expected type: %T", e) - continue - } - source, ok := l["_source"].(map[string]interface{}) - if !ok { - framework.Logf("_source not of the expected type: %T", l["_source"]) - continue - } - msg, ok := source["log"].(string) - if !ok { - framework.Logf("log not of the expected type: %T", source["log"]) - continue - } - words := strings.Split(msg, " ") - if len(words) != 4 { - framework.Logf("Malformed log line: %s", msg) - continue - } - n, err := strconv.ParseUint(words[0], 10, 0) - if err != nil { - framework.Logf("Expecting numer of node as first field of %s", msg) - continue - } - if n < 0 || int(n) >= nodeCount { - framework.Logf("Node count index out of range: %d", nodeCount) - continue - } - index, err := strconv.ParseUint(words[2], 10, 0) - if err != nil { - framework.Logf("Expecting number as third field of %s", msg) - continue - } - if index < 0 || index >= countTo { - framework.Logf("Index value out of range: %d", index) - continue - } - if words[1] != taintName { - framework.Logf("Elasticsearch query return unexpected log line: %s", msg) - continue - } - // Record the observation of a log line from node n at the given index. - observed[n][index]++ - } - // Make sure we correctly observed the expected log lines from each node. - totalMissing = 0 - missingPerNode = make([]int, nodeCount) - incorrectCount := false - for n := range observed { - for i, c := range observed[n] { - if c == 0 { - totalMissing++ - missingPerNode[n]++ - } - if c < 0 || c > 1 { - framework.Logf("Got incorrect count for node %d index %d: %d", n, i, c) - incorrectCount = true - } - } - } - if incorrectCount { - framework.Logf("After %v es still return duplicated log lines", time.Since(start)) - continue - } - if totalMissing != 0 { - framework.Logf("After %v still missing %d log lines", time.Since(start), totalMissing) - continue - } - framework.Logf("After %s found all %d log lines", time.Since(start), expected) - return - } - for n := range missingPerNode { - if missingPerNode[n] > 0 { - framework.Logf("Node %d %s is missing %d logs", n, nodes.Items[n].Name, missingPerNode[n]) - opts := &api.PodLogOptions{} - body, err = f.Client.Pods(ns).GetLogs(podNames[n], opts).DoRaw() - if err != nil { - framework.Logf("Cannot get logs from pod %v", podNames[n]) - continue - } - framework.Logf("Pod %s has the following logs: %s", podNames[n], body) - - for _, pod := range fluentdPods.Items { - if pod.Spec.NodeName == nodes.Items[n].Name { - body, err = f.Client.Pods(api.NamespaceSystem).GetLogs(pod.Name, opts).DoRaw() - if err != nil { - framework.Logf("Cannot get logs from pod %v", pod.Name) - break - } - framework.Logf("Fluentd Pod %s on node %s has the following logs: %s", pod.Name, nodes.Items[n].Name, body) - break - } - } - } - } - framework.Failf("Failed to find all %d log lines", expected) -} diff --git a/test/e2e/kibana_logging.go b/test/e2e/kibana_logging.go index 3bb660d0810..52a756047e4 100644 --- a/test/e2e/kibana_logging.go +++ b/test/e2e/kibana_logging.go @@ -27,7 +27,7 @@ import ( . "github.com/onsi/gomega" ) -var _ = framework.KubeDescribe("Kibana Logging Instances Is Alive", func() { +var _ = framework.KubeDescribe("Kibana Logging Instances Is Alive [Feature:Elasticsearch]", func() { f := framework.NewDefaultFramework("kibana-logging") BeforeEach(func() {