mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #33278 from Crassirostris/gcl-e2e-test
Automatic merge from submit-queue Add gcl cluster logging test This PR changes default logging destination for tests to gcp and adds test for cluster logging using google cloud logging Fix #20760
This commit is contained in:
commit
12b133577e
@ -122,7 +122,7 @@ KUBEPROXY_TEST_ARGS="${KUBEPROXY_TEST_ARGS:-} ${TEST_CLUSTER_API_CONTENT_TYPE}"
|
||||
|
||||
# Optional: Enable node logging.
|
||||
ENABLE_NODE_LOGGING="${KUBE_ENABLE_NODE_LOGGING:-true}"
|
||||
LOGGING_DESTINATION="${KUBE_LOGGING_DESTINATION:-elasticsearch}" # options: elasticsearch, gcp
|
||||
LOGGING_DESTINATION="${KUBE_LOGGING_DESTINATION:-gcp}" # options: elasticsearch, gcp
|
||||
|
||||
# Optional: When set to true, Elasticsearch and Kibana will be setup as part of the cluster bring up.
|
||||
ENABLE_CLUSTER_LOGGING="${KUBE_ENABLE_CLUSTER_LOGGING:-true}"
|
||||
|
@ -119,9 +119,9 @@ test/e2e/common/host_path.go: fmt.Sprintf("--file_content_in_loop=%v", filePat
|
||||
test/e2e/common/host_path.go: fmt.Sprintf("--file_content_in_loop=%v", filePathInReader),
|
||||
test/e2e/common/host_path.go: fmt.Sprintf("--retry_time=%d", retryDuration),
|
||||
test/e2e/common/host_path.go: fmt.Sprintf("--retry_time=%d", retryDuration),
|
||||
test/e2e/es_cluster_logging.go: framework.Failf("No cluster_name field in Elasticsearch response: %v", esResponse)
|
||||
test/e2e/es_cluster_logging.go: // Check to see if have a cluster_name field.
|
||||
test/e2e/es_cluster_logging.go: clusterName, ok := esResponse["cluster_name"]
|
||||
test/e2e/cluster_logging_es.go: return fmt.Errorf("No cluster_name field in Elasticsearch response: %v", esResponse)
|
||||
test/e2e/cluster_logging_es.go: // Check to see if have a cluster_name field.
|
||||
test/e2e/cluster_logging_es.go: clusterName, ok := esResponse["cluster_name"]
|
||||
test/e2e_node/container_manager_test.go: return fmt.Errorf("expected pid %d's oom_score_adj to be %d; found %d", pid, expectedOOMScoreAdj, oomScore)
|
||||
test/e2e_node/container_manager_test.go: return fmt.Errorf("expected pid %d's oom_score_adj to be < %d; found %d", pid, expectedMaxOOMScoreAdj, oomScore)
|
||||
test/e2e_node/container_manager_test.go: return fmt.Errorf("expected pid %d's oom_score_adj to be >= %d; found %d", pid, expectedMinOOMScoreAdj, oomScore)
|
||||
|
288
test/e2e/cluster_logging_es.go
Normal file
288
test/e2e/cluster_logging_es.go
Normal file
@ -0,0 +1,288 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
const (
|
||||
// graceTime is how long to keep retrying requesting elasticsearch for status information.
|
||||
graceTime = 5 * time.Minute
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() {
|
||||
f := framework.NewDefaultFramework("es-logging")
|
||||
|
||||
BeforeEach(func() {
|
||||
// TODO: For now assume we are only testing cluster logging with Elasticsearch
|
||||
// on GCE. Once we are sure that Elasticsearch cluster level logging
|
||||
// works for other providers we should widen this scope of this test.
|
||||
framework.SkipUnlessProviderIs("gce")
|
||||
})
|
||||
|
||||
It("should check that logs from containers are ingested into Elasticsearch", func() {
|
||||
err := checkElasticsearchReadiness(f)
|
||||
framework.ExpectNoError(err, "Elasticsearch failed to start")
|
||||
|
||||
By("Running synthetic logger")
|
||||
createSynthLogger(f, expectedLinesCount)
|
||||
defer f.PodClient().Delete(synthLoggerPodName, &api.DeleteOptions{})
|
||||
err = framework.WaitForPodSuccessInNamespace(f.Client, synthLoggerPodName, synthLoggerPodName, f.Namespace.Name)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName))
|
||||
|
||||
By("Waiting for logs to ingest")
|
||||
totalMissing := expectedLinesCount
|
||||
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||
totalMissing, err = getMissingLinesCountElasticsearch(f, expectedLinesCount)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to get missing lines count due to %v", err)
|
||||
totalMissing = expectedLinesCount
|
||||
} else if totalMissing > 0 {
|
||||
framework.Logf("Still missing %d lines", totalMissing)
|
||||
}
|
||||
|
||||
if totalMissing == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
Expect(totalMissing).To(Equal(0), "Some log lines are still missing")
|
||||
})
|
||||
})
|
||||
|
||||
// Ensures that elasticsearch is running and ready to serve requests
|
||||
func checkElasticsearchReadiness(f *framework.Framework) error {
|
||||
// Check for the existence of the Elasticsearch service.
|
||||
By("Checking the Elasticsearch service exists.")
|
||||
s := f.Client.Services(api.NamespaceSystem)
|
||||
// Make a few attempts to connect. This makes the test robust against
|
||||
// being run as the first e2e test just after the e2e cluster has been created.
|
||||
var err error
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
|
||||
if _, err = s.Get("elasticsearch-logging"); err == nil {
|
||||
break
|
||||
}
|
||||
framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Wait for the Elasticsearch pods to enter the running state.
|
||||
By("Checking to make sure the Elasticsearch pods are running")
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "elasticsearch-logging"}))
|
||||
options := api.ListOptions{LabelSelector: label}
|
||||
pods, err := f.Client.Pods(api.NamespaceSystem).List(options)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
for _, pod := range pods.Items {
|
||||
err = framework.WaitForPodRunningInNamespace(f.Client, &pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
By("Checking to make sure we are talking to an Elasticsearch service.")
|
||||
// Perform a few checks to make sure this looks like an Elasticsearch cluster.
|
||||
var statusCode float64
|
||||
var esResponse map[string]interface{}
|
||||
err = nil
|
||||
var body []byte
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
// Query against the root URL for Elasticsearch.
|
||||
body, err = proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
|
||||
continue
|
||||
}
|
||||
err = json.Unmarshal(body, &esResponse)
|
||||
if err != nil {
|
||||
framework.Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err)
|
||||
continue
|
||||
}
|
||||
statusIntf, ok := esResponse["status"]
|
||||
if !ok {
|
||||
framework.Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse)
|
||||
continue
|
||||
}
|
||||
statusCode, ok = statusIntf.(float64)
|
||||
if !ok {
|
||||
// Assume this is a string returning Failure. Retry.
|
||||
framework.Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf)
|
||||
continue
|
||||
}
|
||||
if int(statusCode) != 200 {
|
||||
framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if int(statusCode) != 200 {
|
||||
return fmt.Errorf("Elasticsearch cluster has a bad status: %v", statusCode)
|
||||
}
|
||||
|
||||
// Check to see if have a cluster_name field.
|
||||
clusterName, ok := esResponse["cluster_name"]
|
||||
if !ok {
|
||||
return fmt.Errorf("No cluster_name field in Elasticsearch response: %v", esResponse)
|
||||
}
|
||||
|
||||
if clusterName != "kubernetes-logging" {
|
||||
return fmt.Errorf("Connected to wrong cluster %q (expecting kubernetes_logging)", clusterName)
|
||||
}
|
||||
|
||||
// Now assume we really are talking to an Elasticsearch instance.
|
||||
// Check the cluster health.
|
||||
By("Checking health of Elasticsearch service.")
|
||||
healthy := false
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
body, err = proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
Suffix("_cluster/health").
|
||||
Param("level", "indices").
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var health map[string]interface{}
|
||||
err := json.Unmarshal(body, &health)
|
||||
if err != nil {
|
||||
framework.Logf("Bad json response from elasticsearch: %v", err)
|
||||
continue
|
||||
}
|
||||
statusIntf, ok := health["status"]
|
||||
if !ok {
|
||||
framework.Logf("No status field found in cluster health response: %v", health)
|
||||
continue
|
||||
}
|
||||
status := statusIntf.(string)
|
||||
if status != "green" && status != "yellow" {
|
||||
framework.Logf("Cluster health has bad status: %v", health)
|
||||
continue
|
||||
}
|
||||
if err == nil && ok {
|
||||
healthy = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !healthy {
|
||||
return fmt.Errorf("After %v elasticsearch cluster is not healthy", graceTime)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int) (int, error) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get())
|
||||
if errProxy != nil {
|
||||
return 0, fmt.Errorf("Failed to get services proxy request: %v", errProxy)
|
||||
}
|
||||
|
||||
// Ask Elasticsearch to return all the log lines that were tagged with the underscore
|
||||
// version of the name. Ask for twice as many log lines as we expect to check for
|
||||
// duplication bugs.
|
||||
body, err := proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
Suffix("_search").
|
||||
// TODO: Change filter to only match records from current test run
|
||||
// after fluent-plugin-kubernetes_metadata_filter is enabled
|
||||
// and optimize current query
|
||||
Param("q", "tag:*synthlogger*").
|
||||
Param("size", strconv.Itoa(expectedCount)).
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("Failed to make proxy call to elasticsearch-logging: %v", err)
|
||||
}
|
||||
|
||||
var response map[string]interface{}
|
||||
err = json.Unmarshal(body, &response)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("Failed to unmarshal response: %v", err)
|
||||
}
|
||||
|
||||
hits, ok := response["hits"].(map[string]interface{})
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("response[hits] not of the expected type: %T", response["hits"])
|
||||
}
|
||||
|
||||
h, ok := hits["hits"].([]interface{})
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("Hits not of the expected type: %T", hits["hits"])
|
||||
}
|
||||
|
||||
// Initialize data-structure for observing counts.
|
||||
counts := make(map[int]int)
|
||||
|
||||
// Iterate over the hits and populate the observed array.
|
||||
for _, e := range h {
|
||||
l, ok := e.(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("Element of hit not of expected type: %T", e)
|
||||
continue
|
||||
}
|
||||
source, ok := l["_source"].(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("_source not of the expected type: %T", l["_source"])
|
||||
continue
|
||||
}
|
||||
msg, ok := source["log"].(string)
|
||||
if !ok {
|
||||
framework.Logf("Log not of the expected type: %T", source["log"])
|
||||
continue
|
||||
}
|
||||
lineNumber, err := strconv.Atoi(strings.TrimSpace(msg))
|
||||
if err != nil {
|
||||
framework.Logf("Log line %s is not a number", msg)
|
||||
continue
|
||||
}
|
||||
if lineNumber < 0 || lineNumber >= expectedCount {
|
||||
framework.Logf("Number %d is not valid, expected number from range [0, %d)", lineNumber, expectedCount)
|
||||
continue
|
||||
}
|
||||
// Record the observation of a log line
|
||||
// Duplicates are possible and fine, fluentd has at-least-once delivery
|
||||
counts[lineNumber]++
|
||||
}
|
||||
|
||||
return expectedCount - len(counts), nil
|
||||
}
|
130
test/e2e/cluster_logging_gcl.go
Normal file
130
test/e2e/cluster_logging_gcl.go
Normal file
@ -0,0 +1,130 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/util/json"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
|
||||
f := framework.NewDefaultFramework("gcl-logging")
|
||||
|
||||
BeforeEach(func() {
|
||||
// TODO (crassirostris): Expand to GKE once the test is stable
|
||||
framework.SkipUnlessProviderIs("gce")
|
||||
})
|
||||
|
||||
It("should check that logs from containers are ingested in GCL", func() {
|
||||
By("Running synthetic logger")
|
||||
createSynthLogger(f, expectedLinesCount)
|
||||
defer f.PodClient().Delete(synthLoggerPodName, &api.DeleteOptions{})
|
||||
err := framework.WaitForPodSuccessInNamespace(f.Client, synthLoggerPodName, synthLoggerPodName, f.Namespace.Name)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName))
|
||||
|
||||
By("Waiting for logs to ingest")
|
||||
totalMissing := expectedLinesCount
|
||||
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
|
||||
var err error
|
||||
totalMissing, err = getMissingLinesCountGcl(f, synthLoggerPodName, expectedLinesCount)
|
||||
if err != nil {
|
||||
framework.Logf("Failed to get missing lines count due to %v", err)
|
||||
totalMissing = expectedLinesCount
|
||||
} else if totalMissing > 0 {
|
||||
framework.Logf("Still missing %d lines", totalMissing)
|
||||
}
|
||||
|
||||
if totalMissing == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
Expect(totalMissing).To(Equal(0), "Some log lines are still missing")
|
||||
})
|
||||
})
|
||||
|
||||
func getMissingLinesCountGcl(f *framework.Framework, podName string, expectedCount int) (int, error) {
|
||||
gclFilter := fmt.Sprintf("resource.labels.pod_id:%s AND resource.labels.namespace_id:%s", podName, f.Namespace.Name)
|
||||
entries, err := readFilteredEntriesFromGcl(gclFilter)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
occurrences := make(map[int]int)
|
||||
for _, entry := range entries {
|
||||
lineNumber, err := strconv.Atoi(strings.TrimSpace(entry))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if lineNumber < 0 || lineNumber >= expectedCount {
|
||||
framework.Logf("Unexpected line number: %d", lineNumber)
|
||||
} else {
|
||||
// Duplicates are possible and fine, fluentd has at-least-once delivery
|
||||
occurrences[lineNumber]++
|
||||
}
|
||||
}
|
||||
|
||||
return expectedCount - len(occurrences), nil
|
||||
}
|
||||
|
||||
type LogEntry struct {
|
||||
TextPayload string
|
||||
}
|
||||
|
||||
// Since GCL API is not easily available from the outside of cluster
|
||||
// we use gcloud command to perform search with filter
|
||||
func readFilteredEntriesFromGcl(filter string) ([]string, error) {
|
||||
framework.Logf("Reading entries from GCL with filter '%v'", filter)
|
||||
argList := []string{"beta",
|
||||
"logging",
|
||||
"read",
|
||||
filter,
|
||||
"--format",
|
||||
"json",
|
||||
"--project",
|
||||
framework.TestContext.CloudConfig.ProjectID,
|
||||
}
|
||||
output, err := exec.Command("gcloud", argList...).CombinedOutput()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var entries []*LogEntry
|
||||
if err = json.Unmarshal(output, &entries); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
framework.Logf("Read %d entries from GCL", len(entries))
|
||||
|
||||
var result []string
|
||||
for _, entry := range entries {
|
||||
if entry.TextPayload != "" {
|
||||
result = append(result, entry.TextPayload)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
58
test/e2e/cluster_logging_utils.go
Normal file
58
test/e2e/cluster_logging_utils.go
Normal file
@ -0,0 +1,58 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
)
|
||||
|
||||
const (
|
||||
// ingestionTimeout is how long to keep retrying to wait for all the
|
||||
// logs to be ingested.
|
||||
ingestionTimeout = 10 * time.Minute
|
||||
// ingestionRetryDelay is how long test should wait between
|
||||
// two attempts to check for ingestion
|
||||
ingestionRetryDelay = 25 * time.Second
|
||||
|
||||
synthLoggerPodName = "synthlogger"
|
||||
|
||||
// expectedLinesCount is the number of log lines emitted (and checked) for each synthetic logging pod.
|
||||
expectedLinesCount = 100
|
||||
)
|
||||
|
||||
func createSynthLogger(f *framework.Framework, linesCount int) {
|
||||
f.PodClient().Create(&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: synthLoggerPodName,
|
||||
Namespace: f.Namespace.Name,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: synthLoggerPodName,
|
||||
Image: "gcr.io/google_containers/busybox:1.24",
|
||||
// notice: the subshell syntax is escaped with `$$`
|
||||
Command: []string{"/bin/sh", "-c", fmt.Sprintf("i=0; while [ $i -lt %d ]; do echo $i; i=`expr $i + 1`; done", linesCount)},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
@ -1,473 +0,0 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() {
|
||||
f := framework.NewDefaultFramework("es-logging")
|
||||
|
||||
BeforeEach(func() {
|
||||
// TODO: For now assume we are only testing cluster logging with Elasticsearch
|
||||
// on GCE. Once we are sure that Elasticsearch cluster level logging
|
||||
// works for other providers we should widen this scope of this test.
|
||||
framework.SkipUnlessProviderIs("gce")
|
||||
})
|
||||
|
||||
It("should check that logs from pods on all nodes are ingested into Elasticsearch", func() {
|
||||
ClusterLevelLoggingWithElasticsearch(f)
|
||||
})
|
||||
})
|
||||
|
||||
const (
|
||||
k8sAppKey = "k8s-app"
|
||||
esValue = "elasticsearch-logging"
|
||||
fluentdValue = "fluentd-logging"
|
||||
)
|
||||
|
||||
func bodyToJSON(body []byte) (map[string]interface{}, error) {
|
||||
var r map[string]interface{}
|
||||
if err := json.Unmarshal(body, &r); err != nil {
|
||||
framework.Logf("Bad JSON: %s", string(body))
|
||||
return nil, fmt.Errorf("failed to unmarshal Elasticsearch response: %v", err)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func nodeInNodeList(nodeName string, nodeList *api.NodeList) bool {
|
||||
for _, node := range nodeList.Items {
|
||||
if nodeName == node.Name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ClusterLevelLoggingWithElasticsearch is an end to end test for cluster level logging.
|
||||
func ClusterLevelLoggingWithElasticsearch(f *framework.Framework) {
|
||||
// graceTime is how long to keep retrying requests for status information.
|
||||
const graceTime = 5 * time.Minute
|
||||
// ingestionTimeout is how long to keep retrying to wait for all the
|
||||
// logs to be ingested.
|
||||
const ingestionTimeout = 10 * time.Minute
|
||||
|
||||
// Check for the existence of the Elasticsearch service.
|
||||
By("Checking the Elasticsearch service exists.")
|
||||
s := f.Client.Services(api.NamespaceSystem)
|
||||
// Make a few attempts to connect. This makes the test robust against
|
||||
// being run as the first e2e test just after the e2e cluster has been created.
|
||||
var err error
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
|
||||
if _, err = s.Get("elasticsearch-logging"); err == nil {
|
||||
break
|
||||
}
|
||||
framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Wait for the Elasticsearch pods to enter the running state.
|
||||
By("Checking to make sure the Elasticsearch pods are running")
|
||||
label := labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: esValue}))
|
||||
options := api.ListOptions{LabelSelector: label}
|
||||
pods, err := f.Client.Pods(api.NamespaceSystem).List(options)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
for _, pod := range pods.Items {
|
||||
err = framework.WaitForPodRunningInNamespace(f.Client, &pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
By("Checking to make sure we are talking to an Elasticsearch service.")
|
||||
// Perform a few checks to make sure this looks like an Elasticsearch cluster.
|
||||
var statusCode float64
|
||||
var esResponse map[string]interface{}
|
||||
err = nil
|
||||
var body []byte
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
// Query against the root URL for Elasticsearch.
|
||||
body, err = proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
|
||||
continue
|
||||
}
|
||||
esResponse, err = bodyToJSON(body)
|
||||
if err != nil {
|
||||
framework.Logf("After %v failed to convert Elasticsearch JSON response %v to map[string]interface{}: %v", time.Since(start), string(body), err)
|
||||
continue
|
||||
}
|
||||
statusIntf, ok := esResponse["status"]
|
||||
if !ok {
|
||||
framework.Logf("After %v Elasticsearch response has no status field: %v", time.Since(start), esResponse)
|
||||
continue
|
||||
}
|
||||
statusCode, ok = statusIntf.(float64)
|
||||
if !ok {
|
||||
// Assume this is a string returning Failure. Retry.
|
||||
framework.Logf("After %v expected status to be a float64 but got %v of type %T", time.Since(start), statusIntf, statusIntf)
|
||||
continue
|
||||
}
|
||||
if int(statusCode) != 200 {
|
||||
framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if int(statusCode) != 200 {
|
||||
framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
|
||||
}
|
||||
// Check to see if have a cluster_name field.
|
||||
clusterName, ok := esResponse["cluster_name"]
|
||||
if !ok {
|
||||
framework.Failf("No cluster_name field in Elasticsearch response: %v", esResponse)
|
||||
}
|
||||
if clusterName != "kubernetes-logging" {
|
||||
framework.Failf("Connected to wrong cluster %q (expecting kubernetes_logging)", clusterName)
|
||||
}
|
||||
|
||||
// Now assume we really are talking to an Elasticsearch instance.
|
||||
// Check the cluster health.
|
||||
By("Checking health of Elasticsearch service.")
|
||||
healthy := false
|
||||
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
body, err = proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
Suffix("_cluster/health").
|
||||
Param("level", "indices").
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
health, err := bodyToJSON(body)
|
||||
if err != nil {
|
||||
framework.Logf("Bad json response from elasticsearch: %v", err)
|
||||
continue
|
||||
}
|
||||
statusIntf, ok := health["status"]
|
||||
if !ok {
|
||||
framework.Logf("No status field found in cluster health response: %v", health)
|
||||
continue
|
||||
}
|
||||
status := statusIntf.(string)
|
||||
if status != "green" && status != "yellow" {
|
||||
framework.Logf("Cluster health has bad status: %v", health)
|
||||
continue
|
||||
}
|
||||
if err == nil && ok {
|
||||
healthy = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !healthy {
|
||||
framework.Failf("After %v elasticsearch cluster is not healthy", graceTime)
|
||||
}
|
||||
|
||||
// Obtain a list of nodes so we can place one synthetic logger on each node.
|
||||
nodes := framework.GetReadySchedulableNodesOrDie(f.Client)
|
||||
nodeCount := len(nodes.Items)
|
||||
if nodeCount == 0 {
|
||||
framework.Failf("Failed to find any nodes")
|
||||
}
|
||||
framework.Logf("Found %d nodes.", len(nodes.Items))
|
||||
|
||||
// Filter out unhealthy nodes.
|
||||
// Previous tests may have cause failures of some nodes. Let's skip
|
||||
// 'Not Ready' nodes, just in case (there is no need to fail the test).
|
||||
framework.FilterNodes(nodes, func(node api.Node) bool {
|
||||
return framework.IsNodeConditionSetAsExpected(&node, api.NodeReady, true)
|
||||
})
|
||||
if len(nodes.Items) < 2 {
|
||||
framework.Failf("Less than two nodes were found Ready: %d", len(nodes.Items))
|
||||
}
|
||||
framework.Logf("Found %d healthy nodes.", len(nodes.Items))
|
||||
|
||||
// Wait for the Fluentd pods to enter the running state.
|
||||
By("Checking to make sure the Fluentd pod are running on each healthy node")
|
||||
label = labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: fluentdValue}))
|
||||
options = api.ListOptions{LabelSelector: label}
|
||||
fluentdPods, err := f.Client.Pods(api.NamespaceSystem).List(options)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
for _, pod := range fluentdPods.Items {
|
||||
if nodeInNodeList(pod.Spec.NodeName, nodes) {
|
||||
err = framework.WaitForPodRunningInNamespace(f.Client, &pod)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
}
|
||||
|
||||
// Check if each healthy node has fluentd running on it
|
||||
for _, node := range nodes.Items {
|
||||
exists := false
|
||||
for _, pod := range fluentdPods.Items {
|
||||
if pod.Spec.NodeName == node.Name {
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !exists {
|
||||
framework.Failf("Node %v does not have fluentd pod running on it.", node.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a unique root name for the resources in this test to permit
|
||||
// parallel executions of this test.
|
||||
// Use a unique namespace for the resources created in this test.
|
||||
ns := f.Namespace.Name
|
||||
name := "synthlogger"
|
||||
// Form a unique name to taint log lines to be collected.
|
||||
// Replace '-' characters with '_' to prevent the analyzer from breaking apart names.
|
||||
taintName := strings.Replace(ns+name, "-", "_", -1)
|
||||
framework.Logf("Tainting log lines with %v", taintName)
|
||||
// podNames records the names of the synthetic logging pods that are created in the
|
||||
// loop below.
|
||||
var podNames []string
|
||||
// countTo is the number of log lines emitted (and checked) for each synthetic logging pod.
|
||||
const countTo = 100
|
||||
// Instantiate a synthetic logger pod on each node.
|
||||
for i, node := range nodes.Items {
|
||||
podName := fmt.Sprintf("%s-%d", name, i)
|
||||
_, err := f.Client.Pods(ns).Create(&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: podName,
|
||||
Labels: map[string]string{"name": name},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "synth-logger",
|
||||
Image: "gcr.io/google_containers/ubuntu:14.04",
|
||||
// notice: the subshell syntax is escaped with `$$`
|
||||
Command: []string{"bash", "-c", fmt.Sprintf("i=0; while ((i < %d)); do echo \"%d %s $i %s\"; i=$$(($i+1)); done", countTo, i, taintName, podName)},
|
||||
},
|
||||
},
|
||||
NodeName: node.Name,
|
||||
RestartPolicy: api.RestartPolicyNever,
|
||||
},
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
podNames = append(podNames, podName)
|
||||
}
|
||||
|
||||
// Cleanup the pods when we are done.
|
||||
defer func() {
|
||||
for _, pod := range podNames {
|
||||
if err = f.Client.Pods(ns).Delete(pod, nil); err != nil {
|
||||
framework.Logf("Failed to delete pod %s: %v", pod, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for the synthetic logging pods to finish.
|
||||
By("Waiting for the pods to succeed.")
|
||||
for _, pod := range podNames {
|
||||
err = framework.WaitForPodSuccessInNamespace(f.Client, pod, "synth-logger", ns)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
// Make several attempts to observe the logs ingested into Elasticsearch.
|
||||
By("Checking all the log lines were ingested into Elasticsearch")
|
||||
totalMissing := 0
|
||||
expected := nodeCount * countTo
|
||||
missingPerNode := []int{}
|
||||
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(25 * time.Second) {
|
||||
|
||||
// Debugging code to report the status of the elasticsearch logging endpoints.
|
||||
selector := labels.Set{k8sAppKey: esValue}.AsSelector()
|
||||
options := api.ListOptions{LabelSelector: selector}
|
||||
esPods, err := f.Client.Pods(api.NamespaceSystem).List(options)
|
||||
if err != nil {
|
||||
framework.Logf("Attempt to list Elasticsearch nodes encountered a problem -- may retry: %v", err)
|
||||
continue
|
||||
} else {
|
||||
for i, pod := range esPods.Items {
|
||||
framework.Logf("pod %d: %s PodIP %s phase %s condition %+v", i, pod.Name, pod.Status.PodIP, pod.Status.Phase,
|
||||
pod.Status.Conditions)
|
||||
}
|
||||
}
|
||||
|
||||
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.Client, f.Client.Get())
|
||||
if errProxy != nil {
|
||||
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
|
||||
continue
|
||||
}
|
||||
// Ask Elasticsearch to return all the log lines that were tagged with the underscore
|
||||
// version of the name. Ask for twice as many log lines as we expect to check for
|
||||
// duplication bugs.
|
||||
body, err = proxyRequest.Namespace(api.NamespaceSystem).
|
||||
Name("elasticsearch-logging").
|
||||
Suffix("_search").
|
||||
Param("q", fmt.Sprintf("log:%s", taintName)).
|
||||
Param("size", strconv.Itoa(2*expected)).
|
||||
DoRaw()
|
||||
if err != nil {
|
||||
framework.Logf("After %v failed to make proxy call to elasticsearch-logging: %v", time.Since(start), err)
|
||||
continue
|
||||
}
|
||||
|
||||
response, err := bodyToJSON(body)
|
||||
if err != nil {
|
||||
framework.Logf("After %v failed to unmarshal response: %v", time.Since(start), err)
|
||||
framework.Logf("Body: %s", string(body))
|
||||
continue
|
||||
}
|
||||
hits, ok := response["hits"].(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("response[hits] not of the expected type: %T", response["hits"])
|
||||
continue
|
||||
}
|
||||
totalF, ok := hits["total"].(float64)
|
||||
if !ok {
|
||||
framework.Logf("After %v hits[total] not of the expected type: %T", time.Since(start), hits["total"])
|
||||
continue
|
||||
}
|
||||
total := int(totalF)
|
||||
if total != expected {
|
||||
framework.Logf("After %v expecting to find %d log lines but saw %d", time.Since(start), expected, total)
|
||||
}
|
||||
h, ok := hits["hits"].([]interface{})
|
||||
if !ok {
|
||||
framework.Logf("After %v hits not of the expected type: %T", time.Since(start), hits["hits"])
|
||||
continue
|
||||
}
|
||||
// Initialize data-structure for observing counts.
|
||||
observed := make([][]int, nodeCount)
|
||||
for i := range observed {
|
||||
observed[i] = make([]int, countTo)
|
||||
}
|
||||
// Iterate over the hits and populate the observed array.
|
||||
for _, e := range h {
|
||||
l, ok := e.(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("element of hit not of expected type: %T", e)
|
||||
continue
|
||||
}
|
||||
source, ok := l["_source"].(map[string]interface{})
|
||||
if !ok {
|
||||
framework.Logf("_source not of the expected type: %T", l["_source"])
|
||||
continue
|
||||
}
|
||||
msg, ok := source["log"].(string)
|
||||
if !ok {
|
||||
framework.Logf("log not of the expected type: %T", source["log"])
|
||||
continue
|
||||
}
|
||||
words := strings.Split(msg, " ")
|
||||
if len(words) != 4 {
|
||||
framework.Logf("Malformed log line: %s", msg)
|
||||
continue
|
||||
}
|
||||
n, err := strconv.ParseUint(words[0], 10, 0)
|
||||
if err != nil {
|
||||
framework.Logf("Expecting numer of node as first field of %s", msg)
|
||||
continue
|
||||
}
|
||||
if n < 0 || int(n) >= nodeCount {
|
||||
framework.Logf("Node count index out of range: %d", nodeCount)
|
||||
continue
|
||||
}
|
||||
index, err := strconv.ParseUint(words[2], 10, 0)
|
||||
if err != nil {
|
||||
framework.Logf("Expecting number as third field of %s", msg)
|
||||
continue
|
||||
}
|
||||
if index < 0 || index >= countTo {
|
||||
framework.Logf("Index value out of range: %d", index)
|
||||
continue
|
||||
}
|
||||
if words[1] != taintName {
|
||||
framework.Logf("Elasticsearch query return unexpected log line: %s", msg)
|
||||
continue
|
||||
}
|
||||
// Record the observation of a log line from node n at the given index.
|
||||
observed[n][index]++
|
||||
}
|
||||
// Make sure we correctly observed the expected log lines from each node.
|
||||
totalMissing = 0
|
||||
missingPerNode = make([]int, nodeCount)
|
||||
incorrectCount := false
|
||||
for n := range observed {
|
||||
for i, c := range observed[n] {
|
||||
if c == 0 {
|
||||
totalMissing++
|
||||
missingPerNode[n]++
|
||||
}
|
||||
if c < 0 || c > 1 {
|
||||
framework.Logf("Got incorrect count for node %d index %d: %d", n, i, c)
|
||||
incorrectCount = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if incorrectCount {
|
||||
framework.Logf("After %v es still return duplicated log lines", time.Since(start))
|
||||
continue
|
||||
}
|
||||
if totalMissing != 0 {
|
||||
framework.Logf("After %v still missing %d log lines", time.Since(start), totalMissing)
|
||||
continue
|
||||
}
|
||||
framework.Logf("After %s found all %d log lines", time.Since(start), expected)
|
||||
return
|
||||
}
|
||||
for n := range missingPerNode {
|
||||
if missingPerNode[n] > 0 {
|
||||
framework.Logf("Node %d %s is missing %d logs", n, nodes.Items[n].Name, missingPerNode[n])
|
||||
opts := &api.PodLogOptions{}
|
||||
body, err = f.Client.Pods(ns).GetLogs(podNames[n], opts).DoRaw()
|
||||
if err != nil {
|
||||
framework.Logf("Cannot get logs from pod %v", podNames[n])
|
||||
continue
|
||||
}
|
||||
framework.Logf("Pod %s has the following logs: %s", podNames[n], body)
|
||||
|
||||
for _, pod := range fluentdPods.Items {
|
||||
if pod.Spec.NodeName == nodes.Items[n].Name {
|
||||
body, err = f.Client.Pods(api.NamespaceSystem).GetLogs(pod.Name, opts).DoRaw()
|
||||
if err != nil {
|
||||
framework.Logf("Cannot get logs from pod %v", pod.Name)
|
||||
break
|
||||
}
|
||||
framework.Logf("Fluentd Pod %s on node %s has the following logs: %s", pod.Name, nodes.Items[n].Name, body)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
framework.Failf("Failed to find all %d log lines", expected)
|
||||
}
|
@ -27,7 +27,7 @@ import (
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("Kibana Logging Instances Is Alive", func() {
|
||||
var _ = framework.KubeDescribe("Kibana Logging Instances Is Alive [Feature:Elasticsearch]", func() {
|
||||
f := framework.NewDefaultFramework("kibana-logging")
|
||||
|
||||
BeforeEach(func() {
|
||||
|
Loading…
Reference in New Issue
Block a user