Refactor cluster logging tests and add load tests

This commit is contained in:
Mik Vyatskov 2017-02-17 12:02:55 +01:00
parent df1465e99f
commit c025d771f5
8 changed files with 692 additions and 367 deletions

View File

@ -16,7 +16,10 @@ go_library(
"autoscaling_utils.go",
"cadvisor.go",
"cluster_logging_es.go",
"cluster_logging_es_utils.go",
"cluster_logging_gcl.go",
"cluster_logging_gcl_load.go",
"cluster_logging_gcl_utils.go",
"cluster_logging_utils.go",
"cluster_size_autoscaling.go",
"cluster_upgrade.go",
@ -178,8 +181,10 @@ go_library(
"//vendor:golang.org/x/crypto/ssh",
"//vendor:golang.org/x/net/context",
"//vendor:golang.org/x/net/websocket",
"//vendor:golang.org/x/oauth2/google",
"//vendor:google.golang.org/api/compute/v1",
"//vendor:google.golang.org/api/googleapi",
"//vendor:google.golang.org/api/logging/v2beta1",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
@ -189,7 +194,6 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/json",
"//vendor:k8s.io/apimachinery/pkg/util/net",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets",

View File

@ -17,24 +17,13 @@ limitations under the License.
package e2e
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
// graceTime is how long to keep retrying requesting elasticsearch for status information.
graceTime = 5 * time.Minute
)
var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() {
@ -48,241 +37,25 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu
})
It("should check that logs from containers are ingested into Elasticsearch", func() {
err := checkElasticsearchReadiness(f)
framework.ExpectNoError(err, "Elasticsearch failed to start")
podName := "synthlogger"
esLogsProvider, err := newEsLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
err = esLogsProvider.EnsureWorking()
framework.ExpectNoError(err, "Elasticsearch is not working")
By("Running synthetic logger")
createSynthLogger(f, expectedLinesCount)
defer f.PodClient().Delete(synthLoggerPodName, &metav1.DeleteOptions{})
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, synthLoggerPodName, f.Namespace.Name)
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName))
pod := createLoggingPod(f, podName, 100, 1*time.Second)
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name)
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
By("Waiting for logs to ingest")
totalMissing := expectedLinesCount
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
totalMissing, err = getMissingLinesCountElasticsearch(f, expectedLinesCount)
if err != nil {
framework.Logf("Failed to get missing lines count due to %v", err)
totalMissing = expectedLinesCount
} else if totalMissing > 0 {
framework.Logf("Still missing %d lines", totalMissing)
}
err = waitForLogsIngestion(esLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0)
framework.ExpectNoError(err, "Failed to ingest logs")
if totalMissing == 0 {
break
}
if err != nil {
reportLogsFromFluentdPod(f, pod)
}
if totalMissing > 0 {
if err := reportLogsFromFluentdPod(f); err != nil {
framework.Logf("Failed to report logs from fluentd pod due to %v", err)
}
}
Expect(totalMissing).To(Equal(0), "Some log lines are still missing")
})
})
// Ensures that elasticsearch is running and ready to serve requests
func checkElasticsearchReadiness(f *framework.Framework) error {
// Check for the existence of the Elasticsearch service.
By("Checking the Elasticsearch service exists.")
s := f.ClientSet.Core().Services(metav1.NamespaceSystem)
// Make a few attempts to connect. This makes the test robust against
// being run as the first e2e test just after the e2e cluster has been created.
var err error
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
if _, err = s.Get("elasticsearch-logging", metav1.GetOptions{}); err == nil {
break
}
framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
}
Expect(err).NotTo(HaveOccurred())
// Wait for the Elasticsearch pods to enter the running state.
By("Checking to make sure the Elasticsearch pods are running")
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "elasticsearch-logging"}))
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err := f.ClientSet.Core().Pods(metav1.NamespaceSystem).List(options)
Expect(err).NotTo(HaveOccurred())
for _, pod := range pods.Items {
err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod)
Expect(err).NotTo(HaveOccurred())
}
By("Checking to make sure we are talking to an Elasticsearch service.")
// Perform a few checks to make sure this looks like an Elasticsearch cluster.
var statusCode int
err = nil
var body []byte
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(10 * time.Second) {
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
if errProxy != nil {
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Query against the root URL for Elasticsearch.
response := proxyRequest.Namespace(metav1.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging").
Do()
err = response.Error()
response.StatusCode(&statusCode)
if err != nil {
if ctx.Err() != nil {
framework.Failf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue
}
framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue
}
if int(statusCode) != 200 {
framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
continue
}
break
}
Expect(err).NotTo(HaveOccurred())
if int(statusCode) != 200 {
framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
}
// Now assume we really are talking to an Elasticsearch instance.
// Check the cluster health.
By("Checking health of Elasticsearch service.")
healthy := false
for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) {
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
if errProxy != nil {
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
body, err = proxyRequest.Namespace(metav1.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging").
Suffix("_cluster/health").
Param("level", "indices").
DoRaw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to get cluster health from elasticsearch: %v", err)
}
continue
}
health := make(map[string]interface{})
err := json.Unmarshal(body, &health)
if err != nil {
framework.Logf("Bad json response from elasticsearch: %v", err)
continue
}
statusIntf, ok := health["status"]
if !ok {
framework.Logf("No status field found in cluster health response: %v", health)
continue
}
status := statusIntf.(string)
if status != "green" && status != "yellow" {
framework.Logf("Cluster health has bad status: %v", health)
continue
}
if err == nil && ok {
healthy = true
break
}
}
if !healthy {
return fmt.Errorf("After %v elasticsearch cluster is not healthy", graceTime)
}
return nil
}
func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int) (int, error) {
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
if errProxy != nil {
return 0, fmt.Errorf("Failed to get services proxy request: %v", errProxy)
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Ask Elasticsearch to return all the log lines that were tagged with the
// pod name. Ask for ten times as many log lines because duplication is possible.
body, err := proxyRequest.Namespace(metav1.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging").
Suffix("_search").
// TODO: Change filter to only match records from current test run
// after fluent-plugin-kubernetes_metadata_filter is enabled
// and optimize current query
Param("q", fmt.Sprintf("tag:*%s*", synthLoggerPodName)).
Param("size", strconv.Itoa(expectedCount*10)).
DoRaw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to make proxy call to elasticsearch-logging: %v", err)
}
return 0, fmt.Errorf("Failed to make proxy call to elasticsearch-logging: %v", err)
}
var response map[string]interface{}
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("Failed to unmarshal response: %v", err)
}
hits, ok := response["hits"].(map[string]interface{})
if !ok {
return 0, fmt.Errorf("response[hits] not of the expected type: %T", response["hits"])
}
h, ok := hits["hits"].([]interface{})
if !ok {
return 0, fmt.Errorf("Hits not of the expected type: %T", hits["hits"])
}
// Initialize data-structure for observing counts.
counts := make(map[int]int)
// Iterate over the hits and populate the observed array.
for _, e := range h {
l, ok := e.(map[string]interface{})
if !ok {
framework.Logf("Element of hit not of expected type: %T", e)
continue
}
source, ok := l["_source"].(map[string]interface{})
if !ok {
framework.Logf("_source not of the expected type: %T", l["_source"])
continue
}
msg, ok := source["log"].(string)
if !ok {
framework.Logf("Log not of the expected type: %T", source["log"])
continue
}
lineNumber, err := strconv.Atoi(strings.TrimSpace(msg))
if err != nil {
framework.Logf("Log line %s is not a number", msg)
continue
}
if lineNumber < 0 || lineNumber >= expectedCount {
framework.Logf("Number %d is not valid, expected number from range [0, %d)", lineNumber, expectedCount)
continue
}
// Record the observation of a log line
// Duplicates are possible and fine, fluentd has at-least-once delivery
counts[lineNumber]++
}
return expectedCount - len(counts), nil
}

View File

@ -0,0 +1,240 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"encoding/json"
"fmt"
"strconv"
"time"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
// esRetryTimeout is how long to keep retrying requesting elasticsearch for status information.
esRetryTimeout = 5 * time.Minute
// esRetryDelay is how much time to wait between two attempts to send a request to elasticsearch
esRetryDelay = 5 * time.Second
)
type esLogsProvider struct {
Framework *framework.Framework
}
func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) {
return &esLogsProvider{Framework: f}, nil
}
// Ensures that elasticsearch is running and ready to serve requests
func (logsProvider *esLogsProvider) EnsureWorking() error {
f := logsProvider.Framework
// Check for the existence of the Elasticsearch service.
By("Checking the Elasticsearch service exists.")
s := f.ClientSet.Core().Services(api.NamespaceSystem)
// Make a few attempts to connect. This makes the test robust against
// being run as the first e2e test just after the e2e cluster has been created.
var err error
for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
if _, err = s.Get("elasticsearch-logging", meta_v1.GetOptions{}); err == nil {
break
}
framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start))
}
Expect(err).NotTo(HaveOccurred())
// Wait for the Elasticsearch pods to enter the running state.
By("Checking to make sure the Elasticsearch pods are running")
labelSelector := fields.SelectorFromSet(fields.Set(map[string]string{"k8s-app": "elasticsearch-logging"})).String()
options := meta_v1.ListOptions{LabelSelector: labelSelector}
pods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
Expect(err).NotTo(HaveOccurred())
for _, pod := range pods.Items {
err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod)
Expect(err).NotTo(HaveOccurred())
}
By("Checking to make sure we are talking to an Elasticsearch service.")
// Perform a few checks to make sure this looks like an Elasticsearch cluster.
var statusCode int
err = nil
var body []byte
for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
if errProxy != nil {
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
continue
}
// Query against the root URL for Elasticsearch.
response := proxyRequest.Namespace(api.NamespaceSystem).
Name("elasticsearch-logging").
Do()
err = response.Error()
response.StatusCode(&statusCode)
if err != nil {
framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue
}
if int(statusCode) != 200 {
framework.Logf("After %v Elasticsearch cluster has a bad status: %v", time.Since(start), statusCode)
continue
}
break
}
Expect(err).NotTo(HaveOccurred())
if int(statusCode) != 200 {
framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode)
}
// Now assume we really are talking to an Elasticsearch instance.
// Check the cluster health.
By("Checking health of Elasticsearch service.")
healthy := false
for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) {
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
if errProxy != nil {
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
continue
}
body, err = proxyRequest.Namespace(api.NamespaceSystem).
Name("elasticsearch-logging").
Suffix("_cluster/health").
Param("level", "indices").
DoRaw()
if err != nil {
continue
}
health := make(map[string]interface{})
err := json.Unmarshal(body, &health)
if err != nil {
framework.Logf("Bad json response from elasticsearch: %v", err)
continue
}
statusIntf, ok := health["status"]
if !ok {
framework.Logf("No status field found in cluster health response: %v", health)
continue
}
status := statusIntf.(string)
if status != "green" && status != "yellow" {
framework.Logf("Cluster health has bad status: %v", health)
continue
}
if err == nil && ok {
healthy = true
break
}
}
if !healthy {
return fmt.Errorf("after %v elasticsearch cluster is not healthy", esRetryTimeout)
}
return nil
}
func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
f := logsProvider.Framework
proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get())
if errProxy != nil {
framework.Logf("Failed to get services proxy request: %v", errProxy)
return nil
}
// Ask Elasticsearch to return all the log lines that were tagged with the
// pod name. Ask for ten times as many log lines because duplication is possible.
body, err := proxyRequest.Namespace(api.NamespaceSystem).
Name("elasticsearch-logging").
Suffix("_search").
// TODO: Change filter to only match records from current test run
// after fluent-plugin-kubernetes_metadata_filter is enabled
// and optimize current query
Param("q", fmt.Sprintf("tag:*%s*", pod.Name)).
// Ask for more in case we included some unrelated records in our query
Param("size", strconv.Itoa(pod.ExpectedLinesNumber*10)).
DoRaw()
if err != nil {
framework.Logf("Failed to make proxy call to elasticsearch-logging: %v", err)
return nil
}
var response map[string]interface{}
err = json.Unmarshal(body, &response)
if err != nil {
framework.Logf("Failed to unmarshal response: %v", err)
return nil
}
hits, ok := response["hits"].(map[string]interface{})
if !ok {
framework.Logf("response[hits] not of the expected type: %T", response["hits"])
return nil
}
h, ok := hits["hits"].([]interface{})
if !ok {
framework.Logf("Hits not of the expected type: %T", hits["hits"])
return nil
}
entries := []*logEntry{}
// Iterate over the hits and populate the observed array.
for _, e := range h {
l, ok := e.(map[string]interface{})
if !ok {
framework.Logf("Element of hit not of expected type: %T", e)
continue
}
source, ok := l["_source"].(map[string]interface{})
if !ok {
framework.Logf("_source not of the expected type: %T", l["_source"])
continue
}
msg, ok := source["log"].(string)
if !ok {
framework.Logf("Log not of the expected type: %T", source["log"])
continue
}
timestampString, ok := source["@timestamp"].(string)
if !ok {
framework.Logf("Timestamp not of the expected type: %T", source["@timestamp"])
continue
}
timestamp, err := time.Parse(time.RFC3339, timestampString)
if err != nil {
framework.Logf("Timestamp was not in correct format: %s", timestampString)
continue
}
entries = append(entries, &logEntry{
Payload: msg,
Timestamp: timestamp,
})
}
return entries
}

View File

@ -18,17 +18,12 @@ package e2e
import (
"fmt"
"os/exec"
"strconv"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/json"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
@ -39,97 +34,26 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL", func() {
})
It("should check that logs from containers are ingested in GCL", func() {
podName := "synthlogger"
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
err = gclLogsProvider.EnsureWorking()
framework.ExpectNoError(err, "GCL is not working")
By("Running synthetic logger")
createSynthLogger(f, expectedLinesCount)
defer f.PodClient().Delete(synthLoggerPodName, &metav1.DeleteOptions{})
err := framework.WaitForPodSuccessInNamespace(f.ClientSet, synthLoggerPodName, f.Namespace.Name)
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", synthLoggerPodName))
pod := createLoggingPod(f, podName, 100, 1*time.Second)
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name)
framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName))
By("Waiting for logs to ingest")
totalMissing := expectedLinesCount
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
var err error
totalMissing, err = getMissingLinesCountGcl(f, synthLoggerPodName, expectedLinesCount)
if err != nil {
framework.Logf("Failed to get missing lines count due to %v", err)
totalMissing = expectedLinesCount
} else if totalMissing > 0 {
framework.Logf("Still missing %d lines", totalMissing)
}
err = waitForLogsIngestion(gclLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0)
framework.ExpectNoError(err, "Failed to ingest logs")
if totalMissing == 0 {
break
}
if err != nil {
reportLogsFromFluentdPod(f, pod)
}
if totalMissing > 0 {
if err := reportLogsFromFluentdPod(f); err != nil {
framework.Logf("Failed to report logs from fluentd pod due to %v", err)
}
}
Expect(totalMissing).To(Equal(0), "Some log lines are still missing")
})
})
func getMissingLinesCountGcl(f *framework.Framework, podName string, expectedCount int) (int, error) {
gclFilter := fmt.Sprintf("resource.labels.pod_id:%s AND resource.labels.namespace_id:%s", podName, f.Namespace.Name)
entries, err := readFilteredEntriesFromGcl(gclFilter)
if err != nil {
return 0, err
}
occurrences := make(map[int]int)
for _, entry := range entries {
lineNumber, err := strconv.Atoi(strings.TrimSpace(entry))
if err != nil {
continue
}
if lineNumber < 0 || lineNumber >= expectedCount {
framework.Logf("Unexpected line number: %d", lineNumber)
} else {
// Duplicates are possible and fine, fluentd has at-least-once delivery
occurrences[lineNumber]++
}
}
return expectedCount - len(occurrences), nil
}
type LogEntry struct {
TextPayload string
}
// Since GCL API is not easily available from the outside of cluster
// we use gcloud command to perform search with filter
func readFilteredEntriesFromGcl(filter string) ([]string, error) {
framework.Logf("Reading entries from GCL with filter '%v'", filter)
argList := []string{"beta",
"logging",
"read",
filter,
"--format",
"json",
"--project",
framework.TestContext.CloudConfig.ProjectID,
}
output, err := exec.Command("gcloud", argList...).CombinedOutput()
if err != nil {
return nil, err
}
var entries []*LogEntry
if err = json.Unmarshal(output, &entries); err != nil {
return nil, err
}
framework.Logf("Read %d entries from GCL", len(entries))
var result []string
for _, entry := range entries {
if entry.TextPayload != "" {
result = append(result, entry.TextPayload)
}
}
return result, nil
}

View File

@ -0,0 +1,105 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"strconv"
"time"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
)
const (
// TODO(crassirostris): Once test is stable, decrease allowed loses
loadTestMaxAllowedLostFraction = 0.1
)
var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]", func() {
f := framework.NewDefaultFramework("gcl-logging-load")
It("should create a constant load with long-living pods and ensure logs delivery", func() {
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
podCount := 30
loggingDuration := 10 * time.Minute
linesPerSecond := 1000
linesPerPod := linesPerSecond * int(loggingDuration.Seconds()) / podCount
ingestionTimeout := 1 * time.Hour
By("Running logs generator pods")
pods := []*loggingPod{}
for podIdx := 0; podIdx < podCount; podIdx++ {
podName := f.Namespace.Name + "-logs-generator-" + strconv.Itoa(linesPerPod) + "-" + strconv.Itoa(podIdx)
pods = append(pods, createLoggingPod(f, podName, linesPerPod, loggingDuration))
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
}
By("Waiting for pods to succeed")
time.Sleep(loggingDuration)
By("Waiting for all log lines to be ingested")
err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction)
if err != nil {
framework.Failf("Failed to ingest logs: %v", err)
} else {
framework.Logf("Successfully ingested all logs")
}
})
It("should create a constant load with short-living pods and ensure logs delivery", func() {
gclLogsProvider, err := newGclLogsProvider(f)
framework.ExpectNoError(err, "Failed to create GCL logs provider")
maxPodCount := 10
jobDuration := 1 * time.Minute
linesPerPodPerSecond := 10
testDuration := 1 * time.Hour
ingestionTimeout := 1 * time.Hour
podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount))
podRunCount := int(testDuration.Seconds())/int(podRunDelay.Seconds()) - 1
linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds())
By("Running short-living pods")
pods := []*loggingPod{}
for i := 0; i < podRunCount; i++ {
podName := f.Namespace.Name + "-job-logs-generator-" +
strconv.Itoa(maxPodCount) + "-" + strconv.Itoa(linesPerPod) + "-" + strconv.Itoa(i)
pods = append(pods, createLoggingPod(f, podName, linesPerPod, jobDuration))
defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{})
time.Sleep(podRunDelay)
}
By("Waiting for the last pods to finish")
time.Sleep(jobDuration)
By("Waiting for all log lines to be ingested")
err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction)
if err != nil {
framework.Failf("Failed to ingest logs: %v", err)
} else {
framework.Logf("Successfully ingested all logs")
}
})
})

View File

@ -0,0 +1,127 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"time"
"k8s.io/kubernetes/test/e2e/framework"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
gcl "google.golang.org/api/logging/v2beta1"
)
const (
// GCL doesn't support page size more than 1000
gclPageSize = 1000
// If we failed to get response from GCL, it can be a random 500 or
// quota limit exceeded. So we retry for some time in case the problem will go away.
// Quota is enforced every 100 seconds, so we have to wait for more than
// that to reliably get the next portion.
queryGclRetryDelay = 10 * time.Second
queryGclRetryTimeout = 200 * time.Second
)
type gclLogsProvider struct {
GclService *gcl.Service
Framework *framework.Framework
}
func (gclLogsProvider *gclLogsProvider) EnsureWorking() error {
// We assume that GCL is always working
return nil
}
func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) {
ctx := context.Background()
hc, err := google.DefaultClient(ctx, gcl.CloudPlatformScope)
gclService, err := gcl.New(hc)
if err != nil {
return nil, err
}
provider := &gclLogsProvider{
GclService: gclService,
Framework: f,
}
return provider, nil
}
// Since GCL API is not easily available from the outside of cluster
// we use gcloud command to perform search with filter
func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry {
filter := fmt.Sprintf("resource.labels.pod_id=%s AND resource.labels.namespace_id=%s AND timestamp>=\"%v\"",
pod.Name, gclLogsProvider.Framework.Namespace.Name, pod.LastTimestamp.Format(time.RFC3339))
framework.Logf("Reading entries from GCL with filter '%v'", filter)
response := getResponseSafe(gclLogsProvider.GclService, filter, "")
var entries []*logEntry
for response != nil && len(response.Entries) > 0 {
framework.Logf("Received %d entries from GCL", len(response.Entries))
for _, entry := range response.Entries {
if entry.TextPayload == "" {
continue
}
timestamp, parseErr := time.Parse(time.RFC3339, entry.Timestamp)
if parseErr != nil {
continue
}
entries = append(entries, &logEntry{
Timestamp: timestamp,
Payload: entry.TextPayload,
})
}
nextToken := response.NextPageToken
if nextToken == "" {
break
}
response = getResponseSafe(gclLogsProvider.GclService, filter, response.NextPageToken)
}
return entries
}
func getResponseSafe(gclService *gcl.Service, filter string, pageToken string) *gcl.ListLogEntriesResponse {
for start := time.Now(); time.Since(start) < queryGclRetryTimeout; time.Sleep(queryGclRetryDelay) {
response, err := gclService.Entries.List(&gcl.ListLogEntriesRequest{
ProjectIds: []string{
framework.TestContext.CloudConfig.ProjectID,
},
OrderBy: "timestamp desc",
Filter: filter,
PageSize: int64(gclPageSize),
PageToken: pageToken,
}).Do()
if err == nil {
return response
}
framework.Logf("Failed to get response from GCL due to %v, retrying", err)
}
return nil
}

View File

@ -19,52 +19,202 @@ package e2e
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/resource"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/test/e2e/framework"
)
const (
// ingestionTimeout is how long to keep retrying to wait for all the
// logs to be ingested.
ingestionTimeout = 10 * time.Minute
// ingestionRetryDelay is how long test should wait between
// two attempts to check for ingestion
ingestionRetryDelay = 25 * time.Second
// Duration of delay between any two attempts to check if all logs are ingested
ingestionRetryDelay = 10 * time.Second
synthLoggerPodName = "synthlogger"
// Amount of requested cores for logging container in millicores
loggingContainerCpuRequest = 10
// expectedLinesCount is the number of log lines emitted (and checked) for each synthetic logging pod.
expectedLinesCount = 100
// Amount of requested memory for logging container in bytes
loggingContainerMemoryRequest = 10 * 1024 * 1024
)
func createSynthLogger(f *framework.Framework, linesCount int) {
f.PodClient().Create(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: synthLoggerPodName,
Namespace: f.Namespace.Name,
// Type to track the progress of logs generating pod
type loggingPod struct {
// Name of the pod
Name string
// If we didn't read some log entries, their
// timestamps should be no less than this timestamp.
// Effectively, timestamp of the last ingested entry
// for which there's no missing entry before it
LastTimestamp time.Time
// Cache of ingested and read entries
Occurrences map[int]*logEntry
// Number of lines expected to be ingested from this pod
ExpectedLinesNumber int
}
type logEntry struct {
Payload string
Timestamp time.Time
}
type logsProvider interface {
EnsureWorking() error
ReadEntries(*loggingPod) []*logEntry
}
func (entry *logEntry) getLogEntryNumber() (int, bool) {
chunks := strings.Split(entry.Payload, " ")
lineNumber, err := strconv.Atoi(strings.TrimSpace(chunks[0]))
return lineNumber, err == nil
}
func createLoggingPod(f *framework.Framework, podName string, totalLines int, loggingDuration time.Duration) *loggingPod {
framework.Logf("Starting pod %s", podName)
createLogsGeneratorPod(f, podName, totalLines, loggingDuration)
return &loggingPod{
Name: podName,
// It's used to avoid querying logs from before the pod was started
LastTimestamp: time.Now(),
Occurrences: make(map[int]*logEntry),
ExpectedLinesNumber: totalLines,
}
}
func createLogsGeneratorPod(f *framework.Framework, podName string, linesCount int, duration time.Duration) {
f.PodClient().Create(&api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: []v1.Container{
Spec: api_v1.PodSpec{
RestartPolicy: api_v1.RestartPolicyNever,
Containers: []api_v1.Container{
{
Name: synthLoggerPodName,
Image: "gcr.io/google_containers/busybox:1.24",
// notice: the subshell syntax is escaped with `$$`
Command: []string{"/bin/sh", "-c", fmt.Sprintf("i=0; while [ $i -lt %d ]; do echo $i; i=`expr $i + 1`; done", linesCount)},
Name: podName,
Image: "gcr.io/google_containers/logs-generator:v0.1.0",
Env: []api_v1.EnvVar{
{
Name: "LOGS_GENERATOR_LINES_TOTAL",
Value: strconv.Itoa(linesCount),
},
{
Name: "LOGS_GENERATOR_DURATION",
Value: duration.String(),
},
},
Resources: api_v1.ResourceRequirements{
Requests: api_v1.ResourceList{
api_v1.ResourceCPU: *resource.NewMilliQuantity(
loggingContainerCpuRequest,
resource.DecimalSI),
api_v1.ResourceMemory: *resource.NewQuantity(
loggingContainerMemoryRequest,
resource.BinarySI),
},
},
},
},
},
})
}
func reportLogsFromFluentdPod(f *framework.Framework) error {
synthLoggerPod, err := f.PodClient().Get(synthLoggerPodName, metav1.GetOptions{})
func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingestionTimeout time.Duration, maxAllowedLostFraction float64) error {
expectedLinesNumber := 0
for _, pod := range pods {
expectedLinesNumber += pod.ExpectedLinesNumber
}
totalMissing := expectedLinesNumber
missingByPod := make([]int, len(pods))
for podIdx, pod := range pods {
missingByPod[podIdx] = pod.ExpectedLinesNumber
}
for start := time.Now(); totalMissing > 0 && time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) {
missing := 0
for podIdx, pod := range pods {
if missingByPod[podIdx] == 0 {
continue
}
missingByPod[podIdx] = pullMissingLogsCount(logsProvider, pod)
missing += missingByPod[podIdx]
}
totalMissing = missing
if totalMissing > 0 {
framework.Logf("Still missing %d lines in total", totalMissing)
}
}
lostFraction := float64(totalMissing) / float64(expectedLinesNumber)
if totalMissing > 0 {
framework.Logf("After %v still missing %d lines, %.2f%% of total number oflines",
ingestionTimeout, totalMissing, lostFraction*100)
}
if lostFraction > maxAllowedLostFraction {
return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated",
lostFraction*100, maxAllowedLostFraction*100)
}
return nil
}
func pullMissingLogsCount(logsProvider logsProvider, pod *loggingPod) int {
missingOnPod, err := getMissingLinesCount(logsProvider, pod)
if err != nil {
return fmt.Errorf("Failed to get synth logger pod due to %v", err)
framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err)
return pod.ExpectedLinesNumber
} else if missingOnPod > 0 {
framework.Logf("Pod %s is missing %d lines", pod.Name, missingOnPod)
} else {
framework.Logf("All logs from pod %s are ingested", pod.Name)
}
return missingOnPod
}
func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) {
entries := logsProvider.ReadEntries(pod)
for _, entry := range entries {
lineNumber, ok := entry.getLogEntryNumber()
if !ok {
continue
}
if lineNumber < 0 || lineNumber >= pod.ExpectedLinesNumber {
framework.Logf("Unexpected line number: %d", lineNumber)
} else {
pod.Occurrences[lineNumber] = entry
}
}
for i := 0; i < pod.ExpectedLinesNumber; i++ {
entry, ok := pod.Occurrences[i]
if !ok {
break
}
if entry.Timestamp.After(pod.LastTimestamp) {
pod.LastTimestamp = entry.Timestamp
}
}
return pod.ExpectedLinesNumber - len(pod.Occurrences), nil
}
func reportLogsFromFluentdPod(f *framework.Framework, pod *loggingPod) error {
synthLoggerPod, err := f.PodClient().Get(pod.Name, meta_v1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get synth logger pod due to %v", err)
}
synthLoggerNodeName := synthLoggerPod.Spec.NodeName
@ -73,20 +223,20 @@ func reportLogsFromFluentdPod(f *framework.Framework) error {
}
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "fluentd-logging"}))
options := metav1.ListOptions{LabelSelector: label.String()}
fluentdPods, err := f.ClientSet.Core().Pods(metav1.NamespaceSystem).List(options)
options := meta_v1.ListOptions{LabelSelector: label.String()}
fluentdPods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options)
for _, fluentdPod := range fluentdPods.Items {
if fluentdPod.Spec.NodeName == synthLoggerNodeName {
containerName := fluentdPod.Spec.Containers[0].Name
logs, err := framework.GetPodLogs(f.ClientSet, metav1.NamespaceSystem, fluentdPod.Name, containerName)
logs, err := framework.GetPodLogs(f.ClientSet, meta_v1.NamespaceSystem, fluentdPod.Name, containerName)
if err != nil {
return fmt.Errorf("Failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err)
return fmt.Errorf("failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err)
}
framework.Logf("Logs from fluentd pod %s:\n%s", fluentdPod.Name, logs)
return nil
}
}
return fmt.Errorf("Failed to find fluentd pod running on node %s", synthLoggerNodeName)
return fmt.Errorf("failed to find fluentd pod running on node %s", synthLoggerNodeName)
}

View File

@ -11,6 +11,8 @@ Cassandra should create and scale cassandra,fabioy,1,apps,
CassandraStatefulSet should create statefulset,wojtek-t,1,apps,
Cluster level logging using Elasticsearch should check that logs from containers are ingested into Elasticsearch,crassirostris,0,instrumentation,
Cluster level logging using GCL should check that logs from containers are ingested in GCL,crassirostris,0,instrumentation,
Cluster level logging using GCL should create a constant load with long-living pods and ensure logs delivery,crassirostris,0,instrumentation,
Cluster level logging using GCL should create a constant load with short-living pods and ensure logs delivery,crassirostris,0,instrumentation,
Cluster size autoscaling should add node to the particular mig,spxtr,1,autoscaling,
Cluster size autoscaling should correctly scale down after a node is not needed,pmorie,1,autoscaling,
Cluster size autoscaling should correctly scale down after a node is not needed when there is non autoscaled pool,krousey,1,autoscaling,

1 name,owner,auto-assigned,sig
11 CassandraStatefulSet should create statefulset,wojtek-t,1,apps,
12 Cluster level logging using Elasticsearch should check that logs from containers are ingested into Elasticsearch,crassirostris,0,instrumentation,
13 Cluster level logging using GCL should check that logs from containers are ingested in GCL,crassirostris,0,instrumentation,
14 Cluster level logging using GCL should create a constant load with long-living pods and ensure logs delivery,crassirostris,0,instrumentation,
15 Cluster level logging using GCL should create a constant load with short-living pods and ensure logs delivery,crassirostris,0,instrumentation,
16 Cluster size autoscaling should add node to the particular mig,spxtr,1,autoscaling,
17 Cluster size autoscaling should correctly scale down after a node is not needed,pmorie,1,autoscaling,
18 Cluster size autoscaling should correctly scale down after a node is not needed when there is non autoscaled pool,krousey,1,autoscaling,