From 86a2ac9433618a5ffd880f984cd8e0f535c408b2 Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Mon, 31 Jul 2017 19:21:48 +0200 Subject: [PATCH] Refactor logging e2e tests, add new checks --- test/e2e/BUILD | 3 +- test/e2e/e2e_test.go | 3 +- test/e2e/instrumentation/BUILD | 8 +- test/e2e/instrumentation/common/BUILD | 28 ++ .../instrumentation/{ => common}/framework.go | 3 +- test/e2e/instrumentation/imports.go | 22 + test/e2e/instrumentation/logging/BUILD | 31 +- .../logging/elasticsearch/BUILD | 43 ++ .../logging/{es.go => elasticsearch/basic.go} | 45 +- .../{es_kibana.go => elasticsearch/kibana.go} | 70 ++-- .../{es_utils.go => elasticsearch/utils.go} | 72 ++-- .../instrumentation/logging/generic_soak.go | 2 +- test/e2e/instrumentation/logging/imports.go | 22 + test/e2e/instrumentation/logging/sd.go | 66 --- test/e2e/instrumentation/logging/sd_events.go | 94 ----- test/e2e/instrumentation/logging/sd_soak.go | 112 ----- test/e2e/instrumentation/logging/sd_utils.go | 356 ---------------- .../instrumentation/logging/stackdrvier/BUILD | 44 ++ .../logging/stackdrvier/basic.go | 152 +++++++ .../logging/stackdrvier/soak.go | 101 +++++ .../logging/stackdrvier/utils.go | 388 ++++++++++++++++++ test/e2e/instrumentation/logging/utils.go | 320 --------------- test/e2e/instrumentation/logging/utils/BUILD | 45 ++ .../logging/utils/log_provider.go | 25 ++ .../logging/utils/logging_agent.go | 86 ++++ .../logging/utils/logging_pod.go | 188 +++++++++ .../e2e/instrumentation/logging/utils/misc.go | 32 ++ .../instrumentation/logging/utils/types.go | 106 +++++ .../e2e/instrumentation/logging/utils/wait.go | 204 +++++++++ test/e2e/instrumentation/monitoring/BUILD | 2 +- .../instrumentation/monitoring/cadvisor.go | 2 +- .../instrumentation/monitoring/influxdb.go | 2 +- .../instrumentation/monitoring/stackdriver.go | 2 +- 33 files changed, 1607 insertions(+), 1072 deletions(-) create mode 100644 test/e2e/instrumentation/common/BUILD rename test/e2e/instrumentation/{ => common}/framework.go (91%) create mode 100644 test/e2e/instrumentation/imports.go create mode 100644 test/e2e/instrumentation/logging/elasticsearch/BUILD rename test/e2e/instrumentation/logging/{es.go => elasticsearch/basic.go} (52%) rename test/e2e/instrumentation/logging/{es_kibana.go => elasticsearch/kibana.go} (58%) rename test/e2e/instrumentation/logging/{es_utils.go => elasticsearch/utils.go} (81%) create mode 100644 test/e2e/instrumentation/logging/imports.go delete mode 100644 test/e2e/instrumentation/logging/sd.go delete mode 100644 test/e2e/instrumentation/logging/sd_events.go delete mode 100644 test/e2e/instrumentation/logging/sd_soak.go delete mode 100644 test/e2e/instrumentation/logging/sd_utils.go create mode 100644 test/e2e/instrumentation/logging/stackdrvier/BUILD create mode 100644 test/e2e/instrumentation/logging/stackdrvier/basic.go create mode 100644 test/e2e/instrumentation/logging/stackdrvier/soak.go create mode 100644 test/e2e/instrumentation/logging/stackdrvier/utils.go delete mode 100644 test/e2e/instrumentation/logging/utils.go create mode 100644 test/e2e/instrumentation/logging/utils/BUILD create mode 100644 test/e2e/instrumentation/logging/utils/log_provider.go create mode 100644 test/e2e/instrumentation/logging/utils/logging_agent.go create mode 100644 test/e2e/instrumentation/logging/utils/logging_pod.go create mode 100644 test/e2e/instrumentation/logging/utils/misc.go create mode 100644 test/e2e/instrumentation/logging/utils/types.go create mode 100644 test/e2e/instrumentation/logging/utils/wait.go diff --git a/test/e2e/BUILD b/test/e2e/BUILD index 7618d46bf26..cc5cee03412 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -21,8 +21,7 @@ go_test( "//test/e2e/apimachinery:go_default_library", "//test/e2e/autoscaling:go_default_library", "//test/e2e/framework:go_default_library", - "//test/e2e/instrumentation/logging:go_default_library", - "//test/e2e/instrumentation/monitoring:go_default_library", + "//test/e2e/instrumentation:go_default_library", "//test/e2e/kubectl:go_default_library", "//test/e2e/lifecycle:go_default_library", "//test/e2e/lifecycle/bootstrap:go_default_library", diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index ae07686919e..46230bcc3d7 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -22,8 +22,7 @@ import ( _ "k8s.io/kubernetes/test/e2e/apimachinery" _ "k8s.io/kubernetes/test/e2e/autoscaling" "k8s.io/kubernetes/test/e2e/framework" - _ "k8s.io/kubernetes/test/e2e/instrumentation/logging" - _ "k8s.io/kubernetes/test/e2e/instrumentation/monitoring" + _ "k8s.io/kubernetes/test/e2e/instrumentation" _ "k8s.io/kubernetes/test/e2e/kubectl" _ "k8s.io/kubernetes/test/e2e/lifecycle" _ "k8s.io/kubernetes/test/e2e/lifecycle/bootstrap" diff --git a/test/e2e/instrumentation/BUILD b/test/e2e/instrumentation/BUILD index 5fdcf57ebca..456da8b01a3 100644 --- a/test/e2e/instrumentation/BUILD +++ b/test/e2e/instrumentation/BUILD @@ -9,9 +9,12 @@ load( go_library( name = "go_default_library", - srcs = ["framework.go"], + srcs = ["imports.go"], tags = ["automanaged"], - deps = ["//vendor/github.com/onsi/ginkgo:go_default_library"], + deps = [ + "//test/e2e/instrumentation/logging:go_default_library", + "//test/e2e/instrumentation/monitoring:go_default_library", + ], ) filegroup( @@ -25,6 +28,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//test/e2e/instrumentation/common:all-srcs", "//test/e2e/instrumentation/logging:all-srcs", "//test/e2e/instrumentation/monitoring:all-srcs", ], diff --git a/test/e2e/instrumentation/common/BUILD b/test/e2e/instrumentation/common/BUILD new file mode 100644 index 00000000000..57be5ca8100 --- /dev/null +++ b/test/e2e/instrumentation/common/BUILD @@ -0,0 +1,28 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["framework.go"], + tags = ["automanaged"], + deps = ["//vendor/github.com/onsi/ginkgo:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/test/e2e/instrumentation/framework.go b/test/e2e/instrumentation/common/framework.go similarity index 91% rename from test/e2e/instrumentation/framework.go rename to test/e2e/instrumentation/common/framework.go index b58c40f4162..af423632bb0 100644 --- a/test/e2e/instrumentation/framework.go +++ b/test/e2e/instrumentation/common/framework.go @@ -14,10 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -package instrumentation +package common import "github.com/onsi/ginkgo" +// SIGDescribe annotates the test with the SIG label. func SIGDescribe(text string, body func()) bool { return ginkgo.Describe("[sig-instrumentation] "+text, body) } diff --git a/test/e2e/instrumentation/imports.go b/test/e2e/instrumentation/imports.go new file mode 100644 index 00000000000..fed261d4715 --- /dev/null +++ b/test/e2e/instrumentation/imports.go @@ -0,0 +1,22 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instrumentation + +import ( + _ "k8s.io/kubernetes/test/e2e/instrumentation/logging" + _ "k8s.io/kubernetes/test/e2e/instrumentation/monitoring" +) diff --git a/test/e2e/instrumentation/logging/BUILD b/test/e2e/instrumentation/logging/BUILD index 4285e4a9c0c..a91c2350537 100644 --- a/test/e2e/instrumentation/logging/BUILD +++ b/test/e2e/instrumentation/logging/BUILD @@ -10,34 +10,18 @@ load( go_library( name = "go_default_library", srcs = [ - "es.go", - "es_kibana.go", - "es_utils.go", "generic_soak.go", - "sd.go", - "sd_events.go", - "sd_soak.go", - "sd_utils.go", - "utils.go", + "imports.go", ], tags = ["automanaged"], deps = [ - "//pkg/api:go_default_library", "//test/e2e/framework:go_default_library", - "//test/e2e/instrumentation:go_default_library", + "//test/e2e/instrumentation/common:go_default_library", + "//test/e2e/instrumentation/logging/elasticsearch:go_default_library", + "//test/e2e/instrumentation/logging/stackdrvier:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", - "//vendor/golang.org/x/net/context:go_default_library", - "//vendor/golang.org/x/oauth2/google:go_default_library", - "//vendor/google.golang.org/api/logging/v2beta1:go_default_library", - "//vendor/google.golang.org/api/pubsub/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/client-go/util/integer:go_default_library", ], ) @@ -50,6 +34,11 @@ filegroup( filegroup( name = "all-srcs", - srcs = [":package-srcs"], + srcs = [ + ":package-srcs", + "//test/e2e/instrumentation/logging/elasticsearch:all-srcs", + "//test/e2e/instrumentation/logging/stackdrvier:all-srcs", + "//test/e2e/instrumentation/logging/utils:all-srcs", + ], tags = ["automanaged"], ) diff --git a/test/e2e/instrumentation/logging/elasticsearch/BUILD b/test/e2e/instrumentation/logging/elasticsearch/BUILD new file mode 100644 index 00000000000..29d194053d5 --- /dev/null +++ b/test/e2e/instrumentation/logging/elasticsearch/BUILD @@ -0,0 +1,43 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "basic.go", + "kibana.go", + "utils.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//test/e2e/framework:go_default_library", + "//test/e2e/instrumentation/common:go_default_library", + "//test/e2e/instrumentation/logging/utils:go_default_library", + "//vendor/github.com/onsi/ginkgo:go_default_library", + "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/test/e2e/instrumentation/logging/es.go b/test/e2e/instrumentation/logging/elasticsearch/basic.go similarity index 52% rename from test/e2e/instrumentation/logging/es.go rename to test/e2e/instrumentation/logging/elasticsearch/basic.go index f6a9770f4d9..6853eadb025 100644 --- a/test/e2e/instrumentation/logging/es.go +++ b/test/e2e/instrumentation/logging/elasticsearch/basic.go @@ -14,55 +14,48 @@ See the License for the specific language governing permissions and limitations under the License. */ -package logging +package elasticsearch import ( - "fmt" "time" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" + "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils" - . "github.com/onsi/ginkgo" + "github.com/onsi/ginkgo" ) var _ = instrumentation.SIGDescribe("Cluster level logging using Elasticsearch [Feature:Elasticsearch]", func() { f := framework.NewDefaultFramework("es-logging") - BeforeEach(func() { + ginkgo.BeforeEach(func() { // TODO: For now assume we are only testing cluster logging with Elasticsearch // on GCE. Once we are sure that Elasticsearch cluster level logging // works for other providers we should widen this scope of this test. framework.SkipUnlessProviderIs("gce") }) - It("should check that logs from containers are ingested into Elasticsearch", func() { - podName := "synthlogger" - esLogsProvider, err := newEsLogsProvider(f) + ginkgo.It("should check that logs from containers are ingested into Elasticsearch", func() { + ingestionInterval := 10 * time.Second + ingestionTimeout := 10 * time.Minute + + p, err := newEsLogProvider(f) framework.ExpectNoError(err, "Failed to create Elasticsearch logs provider") - err = esLogsProvider.Init() - defer esLogsProvider.Cleanup() + err = p.Init() + defer p.Cleanup() framework.ExpectNoError(err, "Failed to init Elasticsearch logs provider") - err = ensureSingleFluentdOnEachNode(f, esLogsProvider.FluentdApplicationName()) + err = utils.EnsureLoggingAgentDeployment(f, p.LoggingAgentName()) framework.ExpectNoError(err, "Fluentd deployed incorrectly") - By("Running synthetic logger") - pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute) - defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) - err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name) - framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName)) + pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger", "test"), f) + framework.ExpectNoError(err, "Failed to start a pod") - By("Waiting for logs to ingest") - config := &loggingTestConfig{ - LogsProvider: esLogsProvider, - Pods: []*loggingPod{pod}, - IngestionTimeout: 10 * time.Minute, - MaxAllowedLostFraction: 0, - MaxAllowedFluentdRestarts: 0, - } - framework.ExpectNoError(waitForSomeLogs(f, config), "Failed to ingest logs") + ginkgo.By("Waiting for logs to ingest") + c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, pod.Name()) + err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) + framework.ExpectNoError(err) }) }) diff --git a/test/e2e/instrumentation/logging/es_kibana.go b/test/e2e/instrumentation/logging/elasticsearch/kibana.go similarity index 58% rename from test/e2e/instrumentation/logging/es_kibana.go rename to test/e2e/instrumentation/logging/elasticsearch/kibana.go index f93bb45c268..cc4a4c9c446 100644 --- a/test/e2e/instrumentation/logging/es_kibana.go +++ b/test/e2e/instrumentation/logging/elasticsearch/kibana.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package logging +package elasticsearch import ( "context" @@ -23,23 +23,24 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/util/wait" ) var _ = instrumentation.SIGDescribe("Kibana Logging Instances Is Alive [Feature:Elasticsearch]", func() { f := framework.NewDefaultFramework("kibana-logging") - BeforeEach(func() { + ginkgo.BeforeEach(func() { // TODO: For now assume we are only testing cluster logging with Elasticsearch // and Kibana on GCE. Once we are sure that Elasticsearch and Kibana cluster level logging // works for other providers we should widen this scope of this test. framework.SkipUnlessProviderIs("gce") }) - It("should check that the Kibana logging instance is alive", func() { + ginkgo.It("should check that the Kibana logging instance is alive", func() { ClusterLevelLoggingWithKibana(f) }) }) @@ -51,61 +52,54 @@ const ( // ClusterLevelLoggingWithKibana is an end to end test that checks to see if Kibana is alive. func ClusterLevelLoggingWithKibana(f *framework.Framework) { - // graceTime is how long to keep retrying requests for status information. - const graceTime = 20 * time.Minute + const pollingInterval = 10 * time.Second + const pollingTimeout = 20 * time.Minute // Check for the existence of the Kibana service. - By("Checking the Kibana service exists.") + ginkgo.By("Checking the Kibana service exists.") s := f.ClientSet.Core().Services(metav1.NamespaceSystem) // Make a few attempts to connect. This makes the test robust against // being run as the first e2e test just after the e2e cluster has been created. - var err error - for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) { - if _, err = s.Get("kibana-logging", metav1.GetOptions{}); err == nil { - break + err := wait.Poll(pollingInterval, pollingTimeout, func() (bool, error) { + if _, err := s.Get("kibana-logging", metav1.GetOptions{}); err != nil { + framework.Logf("Kibana is unreachable: %v", err) + return false, nil } - framework.Logf("Attempt to check for the existence of the Kibana service failed after %v", time.Since(start)) - } - Expect(err).NotTo(HaveOccurred()) + return true, nil + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) // Wait for the Kibana pod(s) to enter the running state. - By("Checking to make sure the Kibana pods are running") + ginkgo.By("Checking to make sure the Kibana pods are running") label := labels.SelectorFromSet(labels.Set(map[string]string{kibanaKey: kibanaValue})) options := metav1.ListOptions{LabelSelector: label.String()} pods, err := f.ClientSet.Core().Pods(metav1.NamespaceSystem).List(options) - Expect(err).NotTo(HaveOccurred()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) for _, pod := range pods.Items { err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod) - Expect(err).NotTo(HaveOccurred()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } - By("Checking to make sure we get a response from the Kibana UI.") - err = nil - for start := time.Now(); time.Since(start) < graceTime; time.Sleep(5 * time.Second) { - proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) - if errProxy != nil { - framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy) - err = errProxy - continue + ginkgo.By("Checking to make sure we get a response from the Kibana UI.") + err = wait.Poll(pollingInterval, pollingTimeout, func() (bool, error) { + req, err := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) + if err != nil { + framework.Logf("Failed to get services proxy request: %v", err) + return false, nil } ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) defer cancel() - // Query against the root URL for Kibana. - _, err = proxyRequest.Namespace(metav1.NamespaceSystem). + _, err = req.Namespace(metav1.NamespaceSystem). Context(ctx). Name("kibana-logging"). DoRaw() if err != nil { - if ctx.Err() != nil { - framework.Failf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err) - break - } - framework.Logf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err) - continue + framework.Logf("Proxy call to kibana-logging failed: %v", err) + return false, nil } - break - } - Expect(err).NotTo(HaveOccurred()) + return true, nil + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } diff --git a/test/e2e/instrumentation/logging/es_utils.go b/test/e2e/instrumentation/logging/elasticsearch/utils.go similarity index 81% rename from test/e2e/instrumentation/logging/es_utils.go rename to test/e2e/instrumentation/logging/elasticsearch/utils.go index 9737981ee6c..8c06c0cc047 100644 --- a/test/e2e/instrumentation/logging/es_utils.go +++ b/test/e2e/instrumentation/logging/elasticsearch/utils.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package logging +package elasticsearch import ( "encoding/json" @@ -26,31 +26,35 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/test/e2e/framework" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" + "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils" ) const ( // esRetryTimeout is how long to keep retrying requesting elasticsearch for status information. esRetryTimeout = 5 * time.Minute + // esRetryDelay is how much time to wait between two attempts to send a request to elasticsearch esRetryDelay = 5 * time.Second + + // searchPageSize is how many entries to search for in Elasticsearch. + searchPageSize = 1000 ) -type esLogsProvider struct { +var _ utils.LogProvider = &esLogProvider{} + +type esLogProvider struct { Framework *framework.Framework } -func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) { - return &esLogsProvider{Framework: f}, nil +func newEsLogProvider(f *framework.Framework) (*esLogProvider, error) { + return &esLogProvider{Framework: f}, nil } // Ensures that elasticsearch is running and ready to serve requests -func (logsProvider *esLogsProvider) Init() error { - f := logsProvider.Framework +func (p *esLogProvider) Init() error { + f := p.Framework // Check for the existence of the Elasticsearch service. - By("Checking the Elasticsearch service exists.") + framework.Logf("Checking the Elasticsearch service exists.") s := f.ClientSet.Core().Services(api.NamespaceSystem) // Make a few attempts to connect. This makes the test robust against // being run as the first e2e test just after the e2e cluster has been created. @@ -61,20 +65,26 @@ func (logsProvider *esLogsProvider) Init() error { } framework.Logf("Attempt to check for the existence of the Elasticsearch service failed after %v", time.Since(start)) } - Expect(err).NotTo(HaveOccurred()) + if err != nil { + return err + } // Wait for the Elasticsearch pods to enter the running state. - By("Checking to make sure the Elasticsearch pods are running") + framework.Logf("Checking to make sure the Elasticsearch pods are running") labelSelector := fields.SelectorFromSet(fields.Set(map[string]string{"k8s-app": "elasticsearch-logging"})).String() options := meta_v1.ListOptions{LabelSelector: labelSelector} pods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options) - Expect(err).NotTo(HaveOccurred()) + if err != nil { + return err + } for _, pod := range pods.Items { err = framework.WaitForPodRunningInNamespace(f.ClientSet, &pod) - Expect(err).NotTo(HaveOccurred()) + if err != nil { + return err + } } - By("Checking to make sure we are talking to an Elasticsearch service.") + framework.Logf("Checking to make sure we are talking to an Elasticsearch service.") // Perform a few checks to make sure this looks like an Elasticsearch cluster. var statusCode int err = nil @@ -102,14 +112,16 @@ func (logsProvider *esLogsProvider) Init() error { } break } - Expect(err).NotTo(HaveOccurred()) + if err != nil { + return err + } if int(statusCode) != 200 { framework.Failf("Elasticsearch cluster has a bad status: %v", statusCode) } // Now assume we really are talking to an Elasticsearch instance. // Check the cluster health. - By("Checking health of Elasticsearch service.") + framework.Logf("Checking health of Elasticsearch service.") healthy := false for start := time.Now(); time.Since(start) < esRetryTimeout; time.Sleep(esRetryDelay) { proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) @@ -153,12 +165,12 @@ func (logsProvider *esLogsProvider) Init() error { return nil } -func (logsProvider *esLogsProvider) Cleanup() { +func (p *esLogProvider) Cleanup() { // Nothing to do } -func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry { - f := logsProvider.Framework +func (p *esLogProvider) ReadEntries(name string) []utils.LogEntry { + f := p.Framework proxyRequest, errProxy := framework.GetServicesProxyRequest(f.ClientSet, f.ClientSet.Core().RESTClient().Get()) if errProxy != nil { @@ -166,7 +178,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry { return nil } - query := fmt.Sprintf("kubernetes.pod_name:%s AND kubernetes.namespace_name:%s", pod.Name, f.Namespace.Name) + query := fmt.Sprintf("kubernetes.pod_name:%s AND kubernetes.namespace_name:%s", name, f.Namespace.Name) framework.Logf("Sending a search request to Elasticsearch with the following query: %s", query) // Ask Elasticsearch to return all the log lines that were tagged with the @@ -176,7 +188,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry { Suffix("_search"). Param("q", query). // Ask for more in case we included some unrelated records in our query - Param("size", strconv.Itoa(pod.ExpectedLinesNumber*10)). + Param("size", strconv.Itoa(searchPageSize)). DoRaw() if err != nil { framework.Logf("Failed to make proxy call to elasticsearch-logging: %v", err) @@ -202,7 +214,7 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry { return nil } - entries := []logEntry{} + entries := []utils.LogEntry{} // Iterate over the hits and populate the observed array. for _, e := range h { l, ok := e.(map[string]interface{}) @@ -218,17 +230,23 @@ func (logsProvider *esLogsProvider) ReadEntries(pod *loggingPod) []logEntry { } msg, ok := source["log"].(string) - if !ok { - framework.Logf("Log not of the expected type: %T", source["log"]) + if ok { + entries = append(entries, utils.LogEntry{TextPayload: msg}) continue } - entries = append(entries, logEntry{Payload: msg}) + obj, ok := source["log"].(map[string]interface{}) + if ok { + entries = append(entries, utils.LogEntry{JSONPayload: obj}) + continue + } + + framework.Logf("Log is of unknown type, got %v, want string or object in field 'log'", source) } return entries } -func (logsProvider *esLogsProvider) FluentdApplicationName() string { +func (p *esLogProvider) LoggingAgentName() string { return "fluentd-es" } diff --git a/test/e2e/instrumentation/logging/generic_soak.go b/test/e2e/instrumentation/logging/generic_soak.go index 5250d5590f3..40535b6ce19 100644 --- a/test/e2e/instrumentation/logging/generic_soak.go +++ b/test/e2e/instrumentation/logging/generic_soak.go @@ -27,7 +27,7 @@ import ( . "github.com/onsi/gomega" "k8s.io/api/core/v1" "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" ) var _ = instrumentation.SIGDescribe("Logging soak [Performance] [Slow] [Disruptive]", func() { diff --git a/test/e2e/instrumentation/logging/imports.go b/test/e2e/instrumentation/logging/imports.go new file mode 100644 index 00000000000..a3621e7bdc9 --- /dev/null +++ b/test/e2e/instrumentation/logging/imports.go @@ -0,0 +1,22 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logging + +import ( + _ "k8s.io/kubernetes/test/e2e/instrumentation/logging/elasticsearch" + _ "k8s.io/kubernetes/test/e2e/instrumentation/logging/stackdrvier" +) diff --git a/test/e2e/instrumentation/logging/sd.go b/test/e2e/instrumentation/logging/sd.go deleted file mode 100644 index bc12b0c7917..00000000000 --- a/test/e2e/instrumentation/logging/sd.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package logging - -import ( - "fmt" - "time" - - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" - - . "github.com/onsi/ginkgo" -) - -var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver", func() { - f := framework.NewDefaultFramework("sd-logging") - - BeforeEach(func() { - framework.SkipUnlessProviderIs("gce", "gke") - }) - - It("should ingest logs from applications", func() { - podName := "synthlogger" - - sdLogsProvider, err := newSdLogsProvider(f) - framework.ExpectNoError(err, "Failed to create Stackdriver logs provider") - - err = sdLogsProvider.Init() - defer sdLogsProvider.Cleanup() - framework.ExpectNoError(err, "Failed to init Stackdriver logs provider") - - err = ensureSingleFluentdOnEachNode(f, sdLogsProvider.FluentdApplicationName()) - framework.ExpectNoError(err, "Fluentd deployed incorrectly") - - By("Running synthetic logger") - pod := startNewLoggingPod(f, podName, "", 10*60, 10*time.Minute) - defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) - err = framework.WaitForPodNameRunningInNamespace(f.ClientSet, podName, f.Namespace.Name) - framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to be running", podName)) - - By("Waiting for logs to ingest") - config := &loggingTestConfig{ - LogsProvider: sdLogsProvider, - Pods: []*loggingPod{pod}, - IngestionTimeout: 10 * time.Minute, - MaxAllowedLostFraction: 0, - MaxAllowedFluentdRestarts: 0, - } - framework.ExpectNoError(waitForSomeLogs(f, config), "Failed to ingest logs") - }) -}) diff --git a/test/e2e/instrumentation/logging/sd_events.go b/test/e2e/instrumentation/logging/sd_events.go deleted file mode 100644 index 10ec311f93f..00000000000 --- a/test/e2e/instrumentation/logging/sd_events.go +++ /dev/null @@ -1,94 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package logging - -import ( - "time" - - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" - - . "github.com/onsi/ginkgo" -) - -const ( - // eventsIngestionTimeout is the amount of time to wait until some - // events are ingested. - eventsIngestionTimeout = 10 * time.Minute - - // eventPollingInterval is the delay between attempts to read events - // from the logs provider. - eventPollingInterval = 1 * time.Second - - // eventCreationInterval is the minimal delay between two events - // created for testing purposes. - eventCreationInterval = 10 * time.Second -) - -var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver", func() { - f := framework.NewDefaultFramework("sd-logging-events") - - BeforeEach(func() { - framework.SkipUnlessProviderIs("gce", "gke") - }) - - It("should ingest events", func() { - sdLogsProvider, err := newSdLogsProvider(f) - framework.ExpectNoError(err, "Failed to create Stackdriver logs provider") - - err = sdLogsProvider.Init() - defer sdLogsProvider.Cleanup() - framework.ExpectNoError(err, "Failed to init Stackdriver logs provider") - - stopCh := make(chan struct{}) - successCh := make(chan struct{}) - go func() { - wait.Poll(eventPollingInterval, eventsIngestionTimeout, func() (bool, error) { - events := sdLogsProvider.ReadEvents() - if len(events) > 0 { - framework.Logf("Some events are ingested, sample event: %v", events[0]) - close(successCh) - return true, nil - } - return false, nil - }) - close(stopCh) - }() - - By("Running pods to generate events while waiting for some of them to be ingested") - wait.PollUntil(eventCreationInterval, func() (bool, error) { - podName := "synthlogger" - startNewLoggingPod(f, podName, "", 1, 1*time.Second) - defer f.PodClient().Delete(podName, &meta_v1.DeleteOptions{}) - err = framework.WaitForPodSuccessInNamespace(f.ClientSet, podName, f.Namespace.Name) - if err != nil { - framework.Logf("Failed to wait pod %s to successfully complete due to %v", podName, err) - } - - return false, nil - }, stopCh) - - select { - case <-successCh: - break - default: - framework.Failf("No events are present in Stackdriver after %v", eventsIngestionTimeout) - } - }) -}) diff --git a/test/e2e/instrumentation/logging/sd_soak.go b/test/e2e/instrumentation/logging/sd_soak.go deleted file mode 100644 index fbd40cbba5d..00000000000 --- a/test/e2e/instrumentation/logging/sd_soak.go +++ /dev/null @@ -1,112 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package logging - -import ( - "fmt" - "math" - "time" - - "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" - - . "github.com/onsi/ginkgo" -) - -const ( - // maxAllowedLostFraction is the fraction of lost logs considered acceptable. - maxAllowedLostFraction = 0.01 - // maxAllowedRestartsPerHour is the number of fluentd container restarts - // considered acceptable. Once per hour is fine for now, as long as it - // doesn't loose too much logs. - maxAllowedRestartsPerHour = 1.0 - // lastPodIngestionSlack is the amount of time to wait for the last pod's - // logs to be ingested by the logging agent. - lastPodIngestionSlack = 5 * time.Minute -) - -var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() { - f := framework.NewDefaultFramework("sd-logging-load") - - It("should ingest logs from applications running for a prolonged amount of time", func() { - sdLogsProvider, err := newSdLogsProvider(f) - framework.ExpectNoError(err, "Failed to create Stackdriver logs provider") - - err = sdLogsProvider.Init() - defer sdLogsProvider.Cleanup() - framework.ExpectNoError(err, "Failed to init Stackdriver logs provider") - - nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items - maxPodCount := 10 - jobDuration := 30 * time.Minute - linesPerPodPerSecond := 100 - // TODO(crassirostris): Increase to 21 hrs - testDuration := 3 * time.Hour - ingestionTimeout := testDuration + 30*time.Minute - allowedRestarts := int(math.Ceil(float64(testDuration) / - float64(time.Hour) * maxAllowedRestartsPerHour)) - - podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) - podRunCount := maxPodCount*(int(testDuration/jobDuration)-1) + 1 - linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds()) - - // pods is a flat array of all pods to be run and to expect in Stackdriver. - pods := []*loggingPod{} - // podsByRun is a two-dimensional array of pods, first dimension is the run - // index, the second dimension is the node index. Since we want to create - // an equal load on all nodes, for the same run we have one pod per node. - podsByRun := [][]*loggingPod{} - for runIdx := 0; runIdx < podRunCount; runIdx++ { - podsInRun := []*loggingPod{} - for nodeIdx, node := range nodes { - podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx) - pod := newLoggingPod(podName, node.Name, linesPerPod, jobDuration) - pods = append(pods, pod) - podsInRun = append(podsInRun, pod) - } - podsByRun = append(podsByRun, podsInRun) - } - - By("Running short-living pods") - go func() { - for runIdx := 0; runIdx < podRunCount; runIdx++ { - // Starting one pod on each node. - for _, pod := range podsByRun[runIdx] { - pod.Start(f) - } - time.Sleep(podRunDelay) - } - // Waiting until the last pod has completed - time.Sleep(jobDuration - podRunDelay + lastPodIngestionSlack) - }() - - By("Waiting for all log lines to be ingested") - config := &loggingTestConfig{ - LogsProvider: sdLogsProvider, - Pods: pods, - IngestionTimeout: ingestionTimeout, - MaxAllowedLostFraction: maxAllowedLostFraction, - MaxAllowedFluentdRestarts: allowedRestarts, - } - err = waitForFullLogsIngestion(f, config) - if err != nil { - framework.Failf("Failed to ingest logs: %v", err) - } else { - framework.Logf("Successfully ingested all logs") - } - }) -}) diff --git a/test/e2e/instrumentation/logging/sd_utils.go b/test/e2e/instrumentation/logging/sd_utils.go deleted file mode 100644 index f8c6210e7fa..00000000000 --- a/test/e2e/instrumentation/logging/sd_utils.go +++ /dev/null @@ -1,356 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package logging - -import ( - "encoding/base64" - "encoding/json" - "fmt" - "sync" - "time" - - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubernetes/test/e2e/framework" - - "golang.org/x/net/context" - "golang.org/x/oauth2/google" - sd "google.golang.org/api/logging/v2beta1" - pubsub "google.golang.org/api/pubsub/v1" -) - -const ( - // The amount of time to wait for Stackdriver Logging - // sink to become operational - sinkStartupTimeout = 10 * time.Minute - - // The limit on the number of messages to pull from PubSub - maxPullLogMessages = 100 * 1000 - - // The limit on the number of messages in the single cache - maxCacheSize = 10 * 1000 - - // PubSub topic with log entries polling interval - sdLoggingPollInterval = 100 * time.Millisecond -) - -type sdLogsProvider struct { - SdService *sd.Service - PubsubService *pubsub.Service - Framework *framework.Framework - Topic *pubsub.Topic - Subscription *pubsub.Subscription - LogSink *sd.LogSink - LogEntryCache map[string]chan logEntry - EventCache chan map[string]interface{} - CacheMutex *sync.Mutex - PollingStopChannel chan struct{} -} - -func newSdLogsProvider(f *framework.Framework) (*sdLogsProvider, error) { - ctx := context.Background() - hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope) - sdService, err := sd.New(hc) - if err != nil { - return nil, err - } - - pubsubService, err := pubsub.New(hc) - if err != nil { - return nil, err - } - - provider := &sdLogsProvider{ - SdService: sdService, - PubsubService: pubsubService, - Framework: f, - LogEntryCache: map[string]chan logEntry{}, - EventCache: make(chan map[string]interface{}, maxCacheSize), - CacheMutex: &sync.Mutex{}, - PollingStopChannel: make(chan struct{}, 1), - } - return provider, nil -} - -func (sdLogsProvider *sdLogsProvider) Init() error { - projectId := framework.TestContext.CloudConfig.ProjectID - nsName := sdLogsProvider.Framework.Namespace.Name - - topic, err := sdLogsProvider.createPubSubTopic(projectId, nsName) - if err != nil { - return fmt.Errorf("failed to create PubSub topic: %v", err) - } - sdLogsProvider.Topic = topic - - subs, err := sdLogsProvider.createPubSubSubscription(projectId, nsName, topic.Name) - if err != nil { - return fmt.Errorf("failed to create PubSub subscription: %v", err) - } - sdLogsProvider.Subscription = subs - - logSink, err := sdLogsProvider.createSink(projectId, nsName, nsName, topic.Name) - if err != nil { - return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err) - } - sdLogsProvider.LogSink = logSink - - if err = sdLogsProvider.authorizeSink(); err != nil { - return fmt.Errorf("failed to authorize log sink: %v", err) - } - - if err = sdLogsProvider.waitSinkInit(); err != nil { - return fmt.Errorf("failed to wait for sink to become operational: %v", err) - } - - go sdLogsProvider.pollLogs() - - return nil -} - -func (sdLogsProvider *sdLogsProvider) createPubSubTopic(projectId, topicName string) (*pubsub.Topic, error) { - topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectId, topicName) - topic := &pubsub.Topic{ - Name: topicFullName, - } - return sdLogsProvider.PubsubService.Projects.Topics.Create(topicFullName, topic).Do() -} - -func (sdLogsProvider *sdLogsProvider) createPubSubSubscription(projectId, subsName, topicName string) (*pubsub.Subscription, error) { - subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectId, subsName) - subs := &pubsub.Subscription{ - Name: subsFullName, - Topic: topicName, - } - return sdLogsProvider.PubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do() -} - -func (sdLogsProvider *sdLogsProvider) createSink(projectId, nsName, sinkName, topicName string) (*sd.LogSink, error) { - projectDst := fmt.Sprintf("projects/%s", projectId) - filter := fmt.Sprintf("(resource.type=\"gke_cluster\" AND jsonPayload.kind=\"Event\" AND jsonPayload.metadata.namespace=\"%s\") OR "+ - "(resource.type=\"container\" AND resource.labels.namespace_id=\"%s\")", nsName, nsName) - framework.Logf("Using the following filter for entries: %s", filter) - sink := &sd.LogSink{ - Name: sinkName, - Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName), - Filter: filter, - } - return sdLogsProvider.SdService.Projects.Sinks.Create(projectDst, sink).Do() -} - -func (sdLogsProvider *sdLogsProvider) authorizeSink() error { - topicsService := sdLogsProvider.PubsubService.Projects.Topics - policy, err := topicsService.GetIamPolicy(sdLogsProvider.Topic.Name).Do() - if err != nil { - return err - } - - binding := &pubsub.Binding{ - Role: "roles/pubsub.publisher", - Members: []string{sdLogsProvider.LogSink.WriterIdentity}, - } - policy.Bindings = append(policy.Bindings, binding) - req := &pubsub.SetIamPolicyRequest{Policy: policy} - if _, err = topicsService.SetIamPolicy(sdLogsProvider.Topic.Name, req).Do(); err != nil { - return err - } - - return nil -} - -func (sdLogsProvider *sdLogsProvider) waitSinkInit() error { - framework.Logf("Waiting for log sink to become operational") - return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) { - err := publish(sdLogsProvider.PubsubService, sdLogsProvider.Topic, "embrace eternity") - if err != nil { - framework.Logf("Failed to push message to PubSub due to %v", err) - } - - messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription) - if err != nil { - framework.Logf("Failed to pull messages from PubSub due to %v", err) - return false, nil - } - if len(messages) > 0 { - framework.Logf("Sink %s is operational", sdLogsProvider.LogSink.Name) - return true, nil - } - - return false, nil - }) -} - -func (sdLogsProvider *sdLogsProvider) pollLogs() { - wait.PollUntil(sdLoggingPollInterval, func() (bool, error) { - messages, err := pullAndAck(sdLogsProvider.PubsubService, sdLogsProvider.Subscription) - if err != nil { - framework.Logf("Failed to pull messages from PubSub due to %v", err) - return false, nil - } - - for _, msg := range messages { - logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data) - if err != nil { - framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data) - continue - } - - var sdLogEntry sd.LogEntry - if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil { - framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err) - continue - } - - switch sdLogEntry.Resource.Type { - case "container": - podName := sdLogEntry.Resource.Labels["pod_id"] - ch := sdLogsProvider.getCacheChannel(podName) - ch <- logEntry{Payload: sdLogEntry.TextPayload} - break - case "gke_cluster": - jsonPayloadRaw, err := sdLogEntry.JsonPayload.MarshalJSON() - if err != nil { - framework.Logf("Failed to get jsonPayload from LogEntry %v", sdLogEntry) - break - } - var eventObject map[string]interface{} - err = json.Unmarshal(jsonPayloadRaw, &eventObject) - if err != nil { - framework.Logf("Failed to deserialize jsonPayload as json object %s", string(jsonPayloadRaw[:])) - break - } - sdLogsProvider.EventCache <- eventObject - break - default: - framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type) - break - } - } - - return false, nil - }, sdLogsProvider.PollingStopChannel) -} - -func (sdLogsProvider *sdLogsProvider) Cleanup() { - sdLogsProvider.PollingStopChannel <- struct{}{} - - if sdLogsProvider.LogSink != nil { - projectId := framework.TestContext.CloudConfig.ProjectID - sinkNameId := fmt.Sprintf("projects/%s/sinks/%s", projectId, sdLogsProvider.LogSink.Name) - sinksService := sdLogsProvider.SdService.Projects.Sinks - if _, err := sinksService.Delete(sinkNameId).Do(); err != nil { - framework.Logf("Failed to delete LogSink: %v", err) - } - } - - if sdLogsProvider.Subscription != nil { - subsService := sdLogsProvider.PubsubService.Projects.Subscriptions - if _, err := subsService.Delete(sdLogsProvider.Subscription.Name).Do(); err != nil { - framework.Logf("Failed to delete PubSub subscription: %v", err) - } - } - - if sdLogsProvider.Topic != nil { - topicsService := sdLogsProvider.PubsubService.Projects.Topics - if _, err := topicsService.Delete(sdLogsProvider.Topic.Name).Do(); err != nil { - framework.Logf("Failed to delete PubSub topic: %v", err) - } - } -} - -func (sdLogsProvider *sdLogsProvider) ReadEntries(pod *loggingPod) []logEntry { - var entries []logEntry - ch := sdLogsProvider.getCacheChannel(pod.Name) -polling_loop: - for { - select { - case entry := <-ch: - entries = append(entries, entry) - default: - break polling_loop - } - } - return entries -} - -func (logsProvider *sdLogsProvider) FluentdApplicationName() string { - return "fluentd-gcp" -} - -func (sdLogsProvider *sdLogsProvider) ReadEvents() []map[string]interface{} { - var events []map[string]interface{} -polling_loop: - for { - select { - case event := <-sdLogsProvider.EventCache: - events = append(events, event) - default: - break polling_loop - } - } - return events -} - -func (sdLogsProvider *sdLogsProvider) getCacheChannel(podName string) chan logEntry { - sdLogsProvider.CacheMutex.Lock() - defer sdLogsProvider.CacheMutex.Unlock() - - if ch, ok := sdLogsProvider.LogEntryCache[podName]; ok { - return ch - } - - newCh := make(chan logEntry, maxCacheSize) - sdLogsProvider.LogEntryCache[podName] = newCh - return newCh -} - -func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) { - subsService := service.Projects.Subscriptions - req := &pubsub.PullRequest{ - ReturnImmediately: true, - MaxMessages: maxPullLogMessages, - } - - resp, err := subsService.Pull(subs.Name, req).Do() - if err != nil { - return nil, err - } - - var ids []string - for _, msg := range resp.ReceivedMessages { - ids = append(ids, msg.AckId) - } - if len(ids) > 0 { - ackReq := &pubsub.AcknowledgeRequest{AckIds: ids} - if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil { - framework.Logf("Failed to ack poll: %v", err) - } - } - - return resp.ReceivedMessages, nil -} - -func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error { - topicsService := service.Projects.Topics - req := &pubsub.PublishRequest{ - Messages: []*pubsub.PubsubMessage{ - { - Data: base64.StdEncoding.EncodeToString([]byte(msg)), - }, - }, - } - _, err := topicsService.Publish(topic.Name, req).Do() - return err -} diff --git a/test/e2e/instrumentation/logging/stackdrvier/BUILD b/test/e2e/instrumentation/logging/stackdrvier/BUILD new file mode 100644 index 00000000000..f8e149d084d --- /dev/null +++ b/test/e2e/instrumentation/logging/stackdrvier/BUILD @@ -0,0 +1,44 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "basic.go", + "soak.go", + "utils.go", + ], + tags = ["automanaged"], + deps = [ + "//test/e2e/framework:go_default_library", + "//test/e2e/instrumentation/common:go_default_library", + "//test/e2e/instrumentation/logging/utils:go_default_library", + "//vendor/github.com/onsi/ginkgo:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/golang.org/x/oauth2/google:go_default_library", + "//vendor/google.golang.org/api/logging/v2beta1:go_default_library", + "//vendor/google.golang.org/api/pubsub/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/test/e2e/instrumentation/logging/stackdrvier/basic.go b/test/e2e/instrumentation/logging/stackdrvier/basic.go new file mode 100644 index 00000000000..74b1a7a0d41 --- /dev/null +++ b/test/e2e/instrumentation/logging/stackdrvier/basic.go @@ -0,0 +1,152 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stackdriver + +import ( + "fmt" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" + "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils" + + "github.com/onsi/ginkgo" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/uuid" +) + +const ( + ingestionInterval = 10 * time.Second + ingestionTimeout = 10 * time.Minute +) + +var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver", func() { + f := framework.NewDefaultFramework("sd-logging") + + ginkgo.BeforeEach(func() { + framework.SkipUnlessProviderIs("gce", "gke") + }) + + ginkgo.It("should ingest logs", func() { + + withLogProviderForScope(f, podsScope, func(p *sdLogProvider) { + ginkgo.By("Checking ingesting text logs", func() { + pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-1", "hey"), f) + framework.ExpectNoError(err, "Failed to start a pod") + + ginkgo.By("Waiting for logs to ingest") + c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, pod.Name()) + err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) + framework.ExpectNoError(err) + }) + + ginkgo.By("Checking ingesting json logs", func() { + logRaw := "{\"a\":\"b\"}" + pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-2", logRaw), f) + framework.ExpectNoError(err, "Failed to start a pod") + + ginkgo.By("Waiting for logs to ingest") + c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) { + if len(logEntries) == 0 { + return false, nil + } + log := logEntries[0] + if log.JSONPayload == nil { + return false, fmt.Errorf("log entry unexpectedly is not json: %s", log.TextPayload) + } + if log.JSONPayload["a"] != "b" { + bytes, err := json.Marshal(log.JSONPayload) + if err != nil { + return false, fmt.Errorf("log entry ingested incorrectly, failed to marshal: %v", err) + } + return false, fmt.Errorf("log entry ingested incorrectly, got %v, want %s", + string(bytes), logRaw) + } + return true, nil + }, utils.JustTimeout, pod.Name()) + err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) + framework.ExpectNoError(err) + }) + + ginkgo.By("Checking ingesting logs in glog format", func() { + logUnformatted := "Text" + logRaw := fmt.Sprintf("I0101 00:00:00.000000 1 main.go:1] %s", logUnformatted) + pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-3", logRaw), f) + framework.ExpectNoError(err, "Failed to start a pod") + + ginkgo.By("Waiting for logs to ingest") + c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) { + if len(logEntries) == 0 { + return false, nil + } + log := logEntries[0] + if log.TextPayload == "" { + return false, fmt.Errorf("log entry is unexpectedly json: %v", log.JSONPayload) + } + if log.TextPayload != logUnformatted { + return false, fmt.Errorf("log entry ingested incorrectly, got %s, want %s", + log.TextPayload, logUnformatted) + } + return true, nil + }, utils.JustTimeout, pod.Name()) + err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) + framework.ExpectNoError(err) + }) + }) + }) + + ginkgo.It("should ingest events", func() { + eventCreationInterval := 10 * time.Second + + withLogProviderForScope(f, eventsScope, func(p *sdLogProvider) { + ginkgo.By("Running pods to generate events while waiting for some of them to be ingested") + stopCh := make(chan struct{}) + cleanupCh := make(chan struct{}) + defer func() { <-cleanupCh }() + defer close(stopCh) + go func() { + defer ginkgo.GinkgoRecover() + defer close(cleanupCh) + + wait.PollUntil(eventCreationInterval, func() (bool, error) { + podName := fmt.Sprintf("synthlogger-%s", string(uuid.NewUUID())) + err := utils.NewLoadLoggingPod(podName, "", 1, 1*time.Second).Start(f) + if err != nil { + framework.Logf("Failed to create a logging pod: %v", err) + } + return false, nil + }, stopCh) + }() + + ginkgo.By("Waiting for events to ingest") + c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, "") + err := utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) + framework.ExpectNoError(err) + }) + }) + + ginkgo.It("should ingest system logs from all nodes", func() { + withLogProviderForScope(f, systemScope, func(p *sdLogProvider) { + ginkgo.By("Waiting for some system logs to ingest") + nodeIds := utils.GetNodeIds(f.ClientSet) + c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, nodeIds...) + err := utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) + framework.ExpectNoError(err) + }) + }) +}) diff --git a/test/e2e/instrumentation/logging/stackdrvier/soak.go b/test/e2e/instrumentation/logging/stackdrvier/soak.go new file mode 100644 index 00000000000..92d94764a5f --- /dev/null +++ b/test/e2e/instrumentation/logging/stackdrvier/soak.go @@ -0,0 +1,101 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stackdriver + +import ( + "fmt" + "math" + "time" + + "k8s.io/kubernetes/test/e2e/framework" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" + "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils" + + "github.com/onsi/ginkgo" +) + +const ( + // maxAllowedLostFraction is the fraction of lost logs considered acceptable. + maxAllowedLostFraction = 0.01 + // maxAllowedRestartsPerHour is the number of fluentd container restarts + // considered acceptable. Once per hour is fine for now, as long as it + // doesn't loose too much logs. + maxAllowedRestartsPerHour = 1.0 + // lastPodIngestionSlack is the amount of time to wait for the last pod's + // logs to be ingested by the logging agent. + lastPodIngestionSlack = 5 * time.Minute +) + +var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() { + f := framework.NewDefaultFramework("sd-logging-load") + + ginkgo.It("should ingest logs from applications running for a prolonged amount of time", func() { + withLogProviderForScope(f, podsScope, func(p *sdLogProvider) { + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet).Items + maxPodCount := 10 + jobDuration := 30 * time.Minute + linesPerPodPerSecond := 100 + // TODO(crassirostris): Increase to 21 hrs + testDuration := 3 * time.Hour + ingestionInterval := 1 * time.Minute + ingestionTimeout := testDuration + 30*time.Minute + allowedRestarts := int(math.Ceil(float64(testDuration) / + float64(time.Hour) * maxAllowedRestartsPerHour)) + + podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) + podRunCount := maxPodCount*(int(testDuration/jobDuration)-1) + 1 + linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds()) + + // pods is a flat array of all pods to be run and to expect in Stackdriver. + pods := []utils.FiniteLoggingPod{} + // podsByRun is a two-dimensional array of pods, first dimension is the run + // index, the second dimension is the node index. Since we want to create + // an equal load on all nodes, for the same run we have one pod per node. + podsByRun := [][]utils.FiniteLoggingPod{} + for runIdx := 0; runIdx < podRunCount; runIdx++ { + podsInRun := []utils.FiniteLoggingPod{} + for nodeIdx, node := range nodes { + podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx) + pod := utils.NewLoadLoggingPod(podName, node.Name, linesPerPod, jobDuration) + pods = append(pods, pod) + podsInRun = append(podsInRun, pod) + } + podsByRun = append(podsByRun, podsInRun) + } + + ginkgo.By("Running short-living pods") + go func() { + t := time.NewTicker(podRunDelay) + defer t.Stop() + for runIdx := 0; runIdx < podRunCount; runIdx++ { + // Starting one pod on each node. + for _, pod := range podsByRun[runIdx] { + err := pod.Start(f) + framework.Logf("Failed to start pod: %v", err) + } + <-t.C + } + }() + + checker := utils.NewFullIngestionPodLogChecker(p, maxAllowedLostFraction, pods...) + err := utils.WaitForLogs(checker, ingestionInterval, ingestionTimeout) + framework.ExpectNoError(err) + + utils.EnsureLoggingAgentRestartsCount(f, p.LoggingAgentName(), allowedRestarts) + }) + }) +}) diff --git a/test/e2e/instrumentation/logging/stackdrvier/utils.go b/test/e2e/instrumentation/logging/stackdrvier/utils.go new file mode 100644 index 00000000000..4c28bccbee8 --- /dev/null +++ b/test/e2e/instrumentation/logging/stackdrvier/utils.go @@ -0,0 +1,388 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stackdriver + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "strings" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils" + + "golang.org/x/net/context" + "golang.org/x/oauth2/google" + sd "google.golang.org/api/logging/v2beta1" + pubsub "google.golang.org/api/pubsub/v1" +) + +const ( + // The amount of time to wait for Stackdriver Logging + // sink to become operational + sinkStartupTimeout = 10 * time.Minute + + // The limit on the number of messages to pull from PubSub + maxPullLogMessages = 100 * 1000 + + // maxQueueSize is the limit on the number of messages in the single queue. + maxQueueSize = 10 * 1000 + + // PubSub topic with log entries polling interval + sdLoggingPollInterval = 100 * time.Millisecond +) + +type logProviderScope int + +const ( + podsScope logProviderScope = iota + eventsScope + systemScope +) + +var _ utils.LogProvider = &sdLogProvider{} + +type sdLogProvider struct { + sdService *sd.Service + pubsubService *pubsub.Service + + framework *framework.Framework + + topic *pubsub.Topic + subscription *pubsub.Subscription + logSink *sd.LogSink + + pollingStopChannel chan struct{} + + queueCollection utils.LogsQueueCollection + + scope logProviderScope +} + +func newSdLogProvider(f *framework.Framework, scope logProviderScope) (*sdLogProvider, error) { + ctx := context.Background() + hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope) + sdService, err := sd.New(hc) + if err != nil { + return nil, err + } + + pubsubService, err := pubsub.New(hc) + if err != nil { + return nil, err + } + + provider := &sdLogProvider{ + scope: scope, + sdService: sdService, + pubsubService: pubsubService, + framework: f, + pollingStopChannel: make(chan struct{}, 1), + queueCollection: utils.NewLogsQueueCollection(maxQueueSize), + } + return provider, nil +} + +func (p *sdLogProvider) Init() error { + projectID := framework.TestContext.CloudConfig.ProjectID + nsName := p.framework.Namespace.Name + + topic, err := p.createPubSubTopic(projectID, nsName) + if err != nil { + return fmt.Errorf("failed to create PubSub topic: %v", err) + } + p.topic = topic + + subs, err := p.createPubSubSubscription(projectID, nsName, topic.Name) + if err != nil { + return fmt.Errorf("failed to create PubSub subscription: %v", err) + } + p.subscription = subs + + logSink, err := p.createSink(projectID, nsName, topic.Name) + if err != nil { + return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err) + } + p.logSink = logSink + + if err = p.authorizeSink(); err != nil { + return fmt.Errorf("failed to authorize log sink: %v", err) + } + + if err = p.waitSinkInit(); err != nil { + return fmt.Errorf("failed to wait for sink to become operational: %v", err) + } + + go p.pollLogs() + + return nil +} + +func (p *sdLogProvider) Cleanup() { + p.pollingStopChannel <- struct{}{} + + if p.logSink != nil { + projectID := framework.TestContext.CloudConfig.ProjectID + sinkNameID := fmt.Sprintf("projects/%s/sinks/%s", projectID, p.logSink.Name) + sinksService := p.sdService.Projects.Sinks + if _, err := sinksService.Delete(sinkNameID).Do(); err != nil { + framework.Logf("Failed to delete LogSink: %v", err) + } + } + + if p.subscription != nil { + subsService := p.pubsubService.Projects.Subscriptions + if _, err := subsService.Delete(p.subscription.Name).Do(); err != nil { + framework.Logf("Failed to delete PubSub subscription: %v", err) + } + } + + if p.topic != nil { + topicsService := p.pubsubService.Projects.Topics + if _, err := topicsService.Delete(p.topic.Name).Do(); err != nil { + framework.Logf("Failed to delete PubSub topic: %v", err) + } + } +} + +func (p *sdLogProvider) ReadEntries(name string) []utils.LogEntry { + return p.queueCollection.Pop(name) +} + +func (p *sdLogProvider) LoggingAgentName() string { + return "fluentd-gcp" +} + +func (p *sdLogProvider) createPubSubTopic(projectID, topicName string) (*pubsub.Topic, error) { + topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName) + topic := &pubsub.Topic{ + Name: topicFullName, + } + return p.pubsubService.Projects.Topics.Create(topicFullName, topic).Do() +} + +func (p *sdLogProvider) createPubSubSubscription(projectID, subsName, topicName string) (*pubsub.Subscription, error) { + subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subsName) + subs := &pubsub.Subscription{ + Name: subsFullName, + Topic: topicName, + } + return p.pubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do() +} + +func (p *sdLogProvider) createSink(projectID, sinkName, topicName string) (*sd.LogSink, error) { + filter, err := p.buildFilter() + if err != nil { + return nil, err + } + framework.Logf("Using the following filter for log entries: %s", filter) + sink := &sd.LogSink{ + Name: sinkName, + Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName), + Filter: filter, + } + projectDst := fmt.Sprintf("projects/%s", projectID) + return p.sdService.Projects.Sinks.Create(projectDst, sink).Do() +} + +func (p *sdLogProvider) buildFilter() (string, error) { + switch p.scope { + case podsScope: + return fmt.Sprintf("resource.type=\"container\" AND resource.labels.namespace_id=\"%s\"", + p.framework.Namespace.Name), nil + case eventsScope: + return fmt.Sprintf("resource.type=\"gke_cluster\" AND jsonPayload.metadata.namespace=\"%s\"", + p.framework.Namespace.Name), nil + case systemScope: + nodeFilters := []string{} + for _, nodeID := range utils.GetNodeIds(p.framework.ClientSet) { + nodeFilter := fmt.Sprintf("resource.labels.instance_id=%s", nodeID) + nodeFilters = append(nodeFilters, nodeFilter) + } + return fmt.Sprintf("resource.type=\"gce_instance\" AND (%s)", + strings.Join(nodeFilters, " OR ")), nil + } + return "", fmt.Errorf("Unknown log provider scope: %v", p.scope) +} + +func (p *sdLogProvider) authorizeSink() error { + topicsService := p.pubsubService.Projects.Topics + policy, err := topicsService.GetIamPolicy(p.topic.Name).Do() + if err != nil { + return err + } + + binding := &pubsub.Binding{ + Role: "roles/pubsub.publisher", + Members: []string{p.logSink.WriterIdentity}, + } + policy.Bindings = append(policy.Bindings, binding) + req := &pubsub.SetIamPolicyRequest{Policy: policy} + if _, err = topicsService.SetIamPolicy(p.topic.Name, req).Do(); err != nil { + return err + } + + return nil +} + +func (p *sdLogProvider) waitSinkInit() error { + framework.Logf("Waiting for log sink to become operational") + return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) { + err := publish(p.pubsubService, p.topic, "embrace eternity") + if err != nil { + framework.Logf("Failed to push message to PubSub due to %v", err) + } + + messages, err := pullAndAck(p.pubsubService, p.subscription) + if err != nil { + framework.Logf("Failed to pull messages from PubSub due to %v", err) + return false, nil + } + if len(messages) > 0 { + framework.Logf("Sink %s is operational", p.logSink.Name) + return true, nil + } + + return false, nil + }) +} + +func (p *sdLogProvider) pollLogs() { + wait.PollUntil(sdLoggingPollInterval, func() (bool, error) { + messages, err := pullAndAck(p.pubsubService, p.subscription) + if err != nil { + framework.Logf("Failed to pull messages from PubSub due to %v", err) + return false, nil + } + + for _, msg := range messages { + logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data) + if err != nil { + framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data) + continue + } + + var sdLogEntry sd.LogEntry + if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil { + framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err) + continue + } + + name, ok := p.tryGetName(sdLogEntry) + if !ok { + framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type) + continue + } + + logEntry, err := convertLogEntry(sdLogEntry) + if err != nil { + framework.Logf("Failed to parse Stackdriver LogEntry: %v", err) + continue + } + + p.queueCollection.Push(name, logEntry) + } + + return false, nil + }, p.pollingStopChannel) +} + +func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) { + switch sdLogEntry.Resource.Type { + case "container": + return sdLogEntry.Resource.Labels["pod_id"], true + case "gke_cluster": + return "", true + case "gce_instance": + return sdLogEntry.Resource.Labels["instance_id"], true + } + return "", false +} + +func convertLogEntry(sdLogEntry sd.LogEntry) (utils.LogEntry, error) { + if sdLogEntry.TextPayload != "" { + return utils.LogEntry{TextPayload: sdLogEntry.TextPayload}, nil + } + + bytes, err := sdLogEntry.JsonPayload.MarshalJSON() + if err != nil { + return utils.LogEntry{}, fmt.Errorf("Failed to get jsonPayload from LogEntry %v", sdLogEntry) + } + + var jsonObject map[string]interface{} + err = json.Unmarshal(bytes, &jsonObject) + if err != nil { + return utils.LogEntry{}, + fmt.Errorf("Failed to deserialize jsonPayload as json object %s", string(bytes[:])) + } + return utils.LogEntry{JSONPayload: jsonObject}, nil +} + +func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) { + subsService := service.Projects.Subscriptions + req := &pubsub.PullRequest{ + ReturnImmediately: true, + MaxMessages: maxPullLogMessages, + } + + resp, err := subsService.Pull(subs.Name, req).Do() + if err != nil { + return nil, err + } + + var ids []string + for _, msg := range resp.ReceivedMessages { + ids = append(ids, msg.AckId) + } + if len(ids) > 0 { + ackReq := &pubsub.AcknowledgeRequest{AckIds: ids} + if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil { + framework.Logf("Failed to ack poll: %v", err) + } + } + + return resp.ReceivedMessages, nil +} + +func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error { + topicsService := service.Projects.Topics + req := &pubsub.PublishRequest{ + Messages: []*pubsub.PubsubMessage{ + { + Data: base64.StdEncoding.EncodeToString([]byte(msg)), + }, + }, + } + _, err := topicsService.Publish(topic.Name, req).Do() + return err +} + +func withLogProviderForScope(f *framework.Framework, scope logProviderScope, fun func(*sdLogProvider)) { + p, err := newSdLogProvider(f, scope) + framework.ExpectNoError(err, "Failed to create Stackdriver logs provider") + + err = p.Init() + defer p.Cleanup() + framework.ExpectNoError(err, "Failed to init Stackdriver logs provider") + + err = utils.EnsureLoggingAgentDeployment(f, p.LoggingAgentName()) + framework.ExpectNoError(err, "Logging agents deployed incorrectly") + + fun(p) +} diff --git a/test/e2e/instrumentation/logging/utils.go b/test/e2e/instrumentation/logging/utils.go deleted file mode 100644 index 420ef5035b5..00000000000 --- a/test/e2e/instrumentation/logging/utils.go +++ /dev/null @@ -1,320 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package logging - -import ( - "fmt" - "regexp" - "strconv" - "time" - - api_v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/util/integer" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/test/e2e/framework" -) - -const ( - // Duration of delay between any two attempts to check if all logs are ingested - ingestionRetryDelay = 30 * time.Second - - // Amount of requested cores for logging container in millicores - loggingContainerCpuRequest = 10 - - // Amount of requested memory for logging container in bytes - loggingContainerMemoryRequest = 10 * 1024 * 1024 - - // Name of the container used for logging tests - loggingContainerName = "logging-container" -) - -var ( - // Regexp, matching the contents of log entries, parsed or not - logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ \\d+ logs_generator.go:67] )?(\\d+) .*") -) - -type logEntry struct { - Payload string -} - -func (entry logEntry) getLogEntryNumber() (int, bool) { - submatch := logEntryMessageRegex.FindStringSubmatch(entry.Payload) - if submatch == nil || len(submatch) < 2 { - return 0, false - } - - lineNumber, err := strconv.Atoi(submatch[1]) - return lineNumber, err == nil -} - -type logsProvider interface { - Init() error - Cleanup() - ReadEntries(*loggingPod) []logEntry - FluentdApplicationName() string -} - -type loggingTestConfig struct { - LogsProvider logsProvider - Pods []*loggingPod - IngestionTimeout time.Duration - MaxAllowedLostFraction float64 - MaxAllowedFluentdRestarts int -} - -// Type to track the progress of logs generating pod -type loggingPod struct { - // Name equals to the pod name and the container name. - Name string - // NodeName is the name of the node this pod will be - // assigned to. Can be empty. - NodeName string - // Occurrences is a cache of ingested and read entries. - Occurrences map[int]logEntry - // ExpectedLinesNumber is the number of lines that are - // expected to be ingested from this pod. - ExpectedLinesNumber int - // RunDuration is how long the pod will live. - RunDuration time.Duration -} - -func newLoggingPod(podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod { - return &loggingPod{ - Name: podName, - NodeName: nodeName, - Occurrences: make(map[int]logEntry), - ExpectedLinesNumber: totalLines, - RunDuration: loggingDuration, - } -} - -func (p *loggingPod) Start(f *framework.Framework) { - framework.Logf("Starting pod %s", p.Name) - f.PodClient().Create(&api_v1.Pod{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: p.Name, - }, - Spec: api_v1.PodSpec{ - RestartPolicy: api_v1.RestartPolicyNever, - Containers: []api_v1.Container{ - { - Name: loggingContainerName, - Image: "gcr.io/google_containers/logs-generator:v0.1.0", - Env: []api_v1.EnvVar{ - { - Name: "LOGS_GENERATOR_LINES_TOTAL", - Value: strconv.Itoa(p.ExpectedLinesNumber), - }, - { - Name: "LOGS_GENERATOR_DURATION", - Value: p.RunDuration.String(), - }, - }, - Resources: api_v1.ResourceRequirements{ - Requests: api_v1.ResourceList{ - api_v1.ResourceCPU: *resource.NewMilliQuantity( - loggingContainerCpuRequest, - resource.DecimalSI), - api_v1.ResourceMemory: *resource.NewQuantity( - loggingContainerMemoryRequest, - resource.BinarySI), - }, - }, - }, - }, - NodeName: p.NodeName, - }, - }) -} - -func startNewLoggingPod(f *framework.Framework, podName string, nodeName string, totalLines int, loggingDuration time.Duration) *loggingPod { - pod := newLoggingPod(podName, nodeName, totalLines, loggingDuration) - pod.Start(f) - return pod -} - -func waitForSomeLogs(f *framework.Framework, config *loggingTestConfig) error { - podHasIngestedLogs := make([]bool, len(config.Pods)) - podWithIngestedLogsCount := 0 - - for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) { - for podIdx, pod := range config.Pods { - if podHasIngestedLogs[podIdx] { - continue - } - - entries := config.LogsProvider.ReadEntries(pod) - if len(entries) == 0 { - framework.Logf("No log entries from pod %s", pod.Name) - continue - } - - for _, entry := range entries { - if _, ok := entry.getLogEntryNumber(); ok { - framework.Logf("Found some log entries from pod %s", pod.Name) - podHasIngestedLogs[podIdx] = true - podWithIngestedLogsCount++ - break - } - } - } - - if podWithIngestedLogsCount == len(config.Pods) { - break - } - } - - if podWithIngestedLogsCount < len(config.Pods) { - return fmt.Errorf("some logs were ingested for %d pods out of %d", podWithIngestedLogsCount, len(config.Pods)) - } - - return nil -} - -func waitForFullLogsIngestion(f *framework.Framework, config *loggingTestConfig) error { - expectedLinesNumber := 0 - for _, pod := range config.Pods { - expectedLinesNumber += pod.ExpectedLinesNumber - } - - totalMissing := expectedLinesNumber - - missingByPod := make([]int, len(config.Pods)) - for podIdx, pod := range config.Pods { - missingByPod[podIdx] = pod.ExpectedLinesNumber - } - - for start := time.Now(); time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) { - missing := 0 - for podIdx, pod := range config.Pods { - if missingByPod[podIdx] == 0 { - continue - } - - missingByPod[podIdx] = pullMissingLogsCount(config.LogsProvider, pod) - missing += missingByPod[podIdx] - } - - totalMissing = missing - if totalMissing > 0 { - framework.Logf("Still missing %d lines in total", totalMissing) - } else { - break - } - } - - lostFraction := float64(totalMissing) / float64(expectedLinesNumber) - - if totalMissing > 0 { - framework.Logf("After %v still missing %d lines, %.2f%% of total number of lines", - config.IngestionTimeout, totalMissing, lostFraction*100) - for podIdx, missing := range missingByPod { - if missing != 0 { - framework.Logf("Still missing %d lines for pod %s", missing, config.Pods[podIdx].Name) - } - } - } - - if lostFraction > config.MaxAllowedLostFraction { - return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated", - lostFraction*100, config.MaxAllowedLostFraction*100) - } - - fluentdPods, err := getFluentdPods(f, config.LogsProvider.FluentdApplicationName()) - if err != nil { - return fmt.Errorf("failed to get fluentd pods due to %v", err) - } - - maxRestartCount := 0 - for _, fluentdPod := range fluentdPods.Items { - restartCount := int(fluentdPod.Status.ContainerStatuses[0].RestartCount) - maxRestartCount = integer.IntMax(maxRestartCount, restartCount) - - framework.Logf("Fluentd pod %s on node %s was restarted %d times", - fluentdPod.Name, fluentdPod.Spec.NodeName, restartCount) - } - - if maxRestartCount > config.MaxAllowedFluentdRestarts { - return fmt.Errorf("max fluentd pod restarts was %d, which is more than allowed %d", - maxRestartCount, config.MaxAllowedFluentdRestarts) - } - - return nil -} - -func pullMissingLogsCount(logsProvider logsProvider, pod *loggingPod) int { - missingOnPod, err := getMissingLinesCount(logsProvider, pod) - if err != nil { - framework.Logf("Failed to get missing lines count from pod %s due to %v", pod.Name, err) - return pod.ExpectedLinesNumber - } - - return missingOnPod -} - -func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, error) { - entries := logsProvider.ReadEntries(pod) - - for _, entry := range entries { - lineNumber, ok := entry.getLogEntryNumber() - if !ok { - continue - } - - if lineNumber < 0 || lineNumber >= pod.ExpectedLinesNumber { - framework.Logf("Unexpected line number: %d", lineNumber) - } else { - pod.Occurrences[lineNumber] = entry - } - } - - return pod.ExpectedLinesNumber - len(pod.Occurrences), nil -} - -func ensureSingleFluentdOnEachNode(f *framework.Framework, fluentdApplicationName string) error { - fluentdPodList, err := getFluentdPods(f, fluentdApplicationName) - if err != nil { - return err - } - - fluentdPodsPerNode := make(map[string]int) - for _, fluentdPod := range fluentdPodList.Items { - fluentdPodsPerNode[fluentdPod.Spec.NodeName]++ - } - - nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet) - for _, node := range nodeList.Items { - fluentdPodCount, ok := fluentdPodsPerNode[node.Name] - - if !ok { - return fmt.Errorf("node %s doesn't have fluentd instance", node.Name) - } else if fluentdPodCount != 1 { - return fmt.Errorf("node %s contains %d fluentd instaces, expected exactly one", node.Name, fluentdPodCount) - } - } - - return nil -} - -func getFluentdPods(f *framework.Framework, fluentdApplicationName string) (*api_v1.PodList, error) { - label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": fluentdApplicationName})) - options := meta_v1.ListOptions{LabelSelector: label.String()} - return f.ClientSet.Core().Pods(api.NamespaceSystem).List(options) -} diff --git a/test/e2e/instrumentation/logging/utils/BUILD b/test/e2e/instrumentation/logging/utils/BUILD new file mode 100644 index 00000000000..957327136f0 --- /dev/null +++ b/test/e2e/instrumentation/logging/utils/BUILD @@ -0,0 +1,45 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "log_provider.go", + "logging_agent.go", + "logging_pod.go", + "misc.go", + "types.go", + "wait.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//test/e2e/framework:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/util/integer:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/test/e2e/instrumentation/logging/utils/log_provider.go b/test/e2e/instrumentation/logging/utils/log_provider.go new file mode 100644 index 00000000000..55b2684e54c --- /dev/null +++ b/test/e2e/instrumentation/logging/utils/log_provider.go @@ -0,0 +1,25 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +// LogProvider interface provides an API to get logs from the logging backend. +type LogProvider interface { + Init() error + Cleanup() + ReadEntries(name string) []LogEntry + LoggingAgentName() string +} diff --git a/test/e2e/instrumentation/logging/utils/logging_agent.go b/test/e2e/instrumentation/logging/utils/logging_agent.go new file mode 100644 index 00000000000..4ca60a6c4a4 --- /dev/null +++ b/test/e2e/instrumentation/logging/utils/logging_agent.go @@ -0,0 +1,86 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + + api_v1 "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/util/integer" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/test/e2e/framework" +) + +// EnsureLoggingAgentDeployment checks that logging agent is present on each +// node and returns an error if that's not true. +func EnsureLoggingAgentDeployment(f *framework.Framework, appName string) error { + agentPods, err := getLoggingAgentPods(f, appName) + if err != nil { + return fmt.Errorf("failed to get logging agent pods: %v", err) + } + + agentPerNode := make(map[string]int) + for _, pod := range agentPods.Items { + agentPerNode[pod.Spec.NodeName]++ + } + + nodeList := framework.GetReadySchedulableNodesOrDie(f.ClientSet) + for _, node := range nodeList.Items { + agentPodsCount, ok := agentPerNode[node.Name] + + if !ok { + return fmt.Errorf("node %s doesn't have logging agents, want 1", node.Name) + } else if agentPodsCount != 1 { + return fmt.Errorf("node %s has %d logging agents, want 1", node.Name, agentPodsCount) + } + } + + return nil +} + +// EnsureLoggingAgentRestartsCount checks that each logging agent was restarted +// no more than maxRestarts times and returns an error if there's a pod which +// exceeds this number of restarts. +func EnsureLoggingAgentRestartsCount(f *framework.Framework, appName string, maxRestarts int) error { + agentPods, err := getLoggingAgentPods(f, appName) + if err != nil { + return fmt.Errorf("failed to get logging agent pods: %v", err) + } + + maxRestartCount := 0 + for _, pod := range agentPods.Items { + restartCount := int(pod.Status.ContainerStatuses[0].RestartCount) + maxRestartCount = integer.IntMax(maxRestartCount, restartCount) + + framework.Logf("Logging agent %s on node %s was restarted %d times", + pod.Name, pod.Spec.NodeName, restartCount) + } + + if maxRestartCount > maxRestarts { + return fmt.Errorf("max logging agent restarts was %d, which is more than allowed %d", + maxRestartCount, maxRestarts) + } + return nil +} + +func getLoggingAgentPods(f *framework.Framework, appName string) (*api_v1.PodList, error) { + label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": appName})) + options := meta_v1.ListOptions{LabelSelector: label.String()} + return f.ClientSet.Core().Pods(api.NamespaceSystem).List(options) +} diff --git a/test/e2e/instrumentation/logging/utils/logging_pod.go b/test/e2e/instrumentation/logging/utils/logging_pod.go new file mode 100644 index 00000000000..f950c64460b --- /dev/null +++ b/test/e2e/instrumentation/logging/utils/logging_pod.go @@ -0,0 +1,188 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "strconv" + "time" + + "fmt" + api_v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/test/e2e/framework" +) + +const ( + // Amount of requested cores for logging container in millicores + loggingContainerCPURequest = 10 + + // Amount of requested memory for logging container in bytes + loggingContainerMemoryRequest = 10 * 1024 * 1024 + + // Name of the container used for logging tests + loggingContainerName = "logging-container" +) + +// LoggingPod is an interface of a pod that can be started and that logs +// something to its stdout, possibly indefinitely. +type LoggingPod interface { + // Name equals to the Kubernetes pod name. + Name() string + + // Start method controls when the logging pod is started in the cluster. + Start(f *framework.Framework) error +} + +// StartAndReturnSelf is a helper method to start a logging pod and +// immediately return it. +func StartAndReturnSelf(p LoggingPod, f *framework.Framework) (LoggingPod, error) { + err := p.Start(f) + return p, err +} + +// FiniteLoggingPod is a logging pod that emits a known number of log lines. +type FiniteLoggingPod interface { + LoggingPod + + // ExpectedLinesNumber returns the number of lines that are + // expected to be ingested from this pod. + ExpectedLineCount() int +} + +var _ FiniteLoggingPod = &loadLoggingPod{} + +type loadLoggingPod struct { + name string + nodeName string + expectedLinesCount int + runDuration time.Duration +} + +// NewLoadLoggingPod returns a logging pod that generates totalLines random +// lines over period of length loggingDuration. Lines generated by this +// pod are numbered and have well-defined structure. +func NewLoadLoggingPod(podName string, nodeName string, totalLines int, + loggingDuration time.Duration) FiniteLoggingPod { + return &loadLoggingPod{ + name: podName, + nodeName: nodeName, + expectedLinesCount: totalLines, + runDuration: loggingDuration, + } +} + +func (p *loadLoggingPod) Name() string { + return p.name +} + +func (p *loadLoggingPod) Start(f *framework.Framework) error { + framework.Logf("Starting load logging pod %s", p.name) + f.PodClient().Create(&api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: p.name, + }, + Spec: api_v1.PodSpec{ + RestartPolicy: api_v1.RestartPolicyNever, + Containers: []api_v1.Container{ + { + Name: loggingContainerName, + Image: "gcr.io/google_containers/logs-generator:v0.1.0", + Env: []api_v1.EnvVar{ + { + Name: "LOGS_GENERATOR_LINES_TOTAL", + Value: strconv.Itoa(p.expectedLinesCount), + }, + { + Name: "LOGS_GENERATOR_DURATION", + Value: p.runDuration.String(), + }, + }, + Resources: api_v1.ResourceRequirements{ + Requests: api_v1.ResourceList{ + api_v1.ResourceCPU: *resource.NewMilliQuantity( + loggingContainerCPURequest, + resource.DecimalSI), + api_v1.ResourceMemory: *resource.NewQuantity( + loggingContainerMemoryRequest, + resource.BinarySI), + }, + }, + }, + }, + NodeName: p.nodeName, + }, + }) + return framework.WaitForPodNameRunningInNamespace(f.ClientSet, p.name, f.Namespace.Name) +} + +func (p *loadLoggingPod) ExpectedLineCount() int { + return p.expectedLinesCount +} + +var _ LoggingPod = &repeatingLoggingPod{} + +type repeatingLoggingPod struct { + name string + line string +} + +// NewRepeatingLoggingPod returns a logging pod that each second prints +// line value to its stdout. +func NewRepeatingLoggingPod(podName string, line string) LoggingPod { + return &repeatingLoggingPod{ + name: podName, + line: line, + } +} + +func (p *repeatingLoggingPod) Name() string { + return p.name +} + +func (p *repeatingLoggingPod) Start(f *framework.Framework) error { + framework.Logf("Starting repeating logging pod %s", p.name) + f.PodClient().Create(&api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: p.name, + }, + Spec: api_v1.PodSpec{ + Containers: []api_v1.Container{ + { + Name: loggingContainerName, + Image: "busybox", + Command: []string{ + "/bin/sh", + "-c", + fmt.Sprintf("while :; do echo '%s'; sleep 1; done", p.line), + }, + Resources: api_v1.ResourceRequirements{ + Requests: api_v1.ResourceList{ + api_v1.ResourceCPU: *resource.NewMilliQuantity( + loggingContainerCPURequest, + resource.DecimalSI), + api_v1.ResourceMemory: *resource.NewQuantity( + loggingContainerMemoryRequest, + resource.BinarySI), + }, + }, + }, + }, + }, + }) + return framework.WaitForPodNameRunningInNamespace(f.ClientSet, p.name, f.Namespace.Name) +} diff --git a/test/e2e/instrumentation/logging/utils/misc.go b/test/e2e/instrumentation/logging/utils/misc.go new file mode 100644 index 00000000000..19b4622debe --- /dev/null +++ b/test/e2e/instrumentation/logging/utils/misc.go @@ -0,0 +1,32 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" +) + +// GetNodeIds returns the list of node names and panics in case of failure. +func GetNodeIds(cs clientset.Interface) []string { + nodes := framework.GetReadySchedulableNodesOrDie(cs) + nodeIds := []string{} + for _, n := range nodes.Items { + nodeIds = append(nodeIds, n.Spec.ExternalID) + } + return nodeIds +} diff --git a/test/e2e/instrumentation/logging/utils/types.go b/test/e2e/instrumentation/logging/utils/types.go new file mode 100644 index 00000000000..39825cdc66d --- /dev/null +++ b/test/e2e/instrumentation/logging/utils/types.go @@ -0,0 +1,106 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "regexp" + "strconv" + + "sync" +) + +var ( + // Regexp, matching the contents of log entries, parsed or not + logEntryMessageRegex = regexp.MustCompile("(?:I\\d+ \\d+:\\d+:\\d+.\\d+ \\d+ logs_generator.go:67] )?(\\d+) .*") +) + +// LogEntry represents a log entry, received from the logging backend. +type LogEntry struct { + TextPayload string + JSONPayload map[string]interface{} +} + +// TryGetEntryNumber returns the number of the log entry in sequence, if it +// was generated by the load logging pod (requires special log format). +func (entry LogEntry) TryGetEntryNumber() (int, bool) { + submatch := logEntryMessageRegex.FindStringSubmatch(entry.TextPayload) + if submatch == nil || len(submatch) < 2 { + return 0, false + } + + lineNumber, err := strconv.Atoi(submatch[1]) + return lineNumber, err == nil +} + +// LogsQueueCollection is a thread-safe set of named log queues. +type LogsQueueCollection interface { + Push(name string, logs ...LogEntry) + Pop(name string) []LogEntry +} + +var _ LogsQueueCollection = &logsQueueCollection{} + +type logsQueueCollection struct { + mutex *sync.Mutex + queues map[string]chan LogEntry + queueSize int +} + +// NewLogsQueueCollection returns a new LogsQueueCollection where each queue +// is created with a default size of queueSize. +func NewLogsQueueCollection(queueSize int) LogsQueueCollection { + return &logsQueueCollection{ + mutex: &sync.Mutex{}, + queues: map[string]chan LogEntry{}, + queueSize: queueSize, + } +} + +func (c *logsQueueCollection) Push(name string, logs ...LogEntry) { + q := c.getQueue(name) + for _, log := range logs { + q <- log + } +} + +func (c *logsQueueCollection) Pop(name string) []LogEntry { + q := c.getQueue(name) + var entries []LogEntry +polling_loop: + for { + select { + case entry := <-q: + entries = append(entries, entry) + default: + break polling_loop + } + } + return entries +} + +func (c *logsQueueCollection) getQueue(name string) chan LogEntry { + c.mutex.Lock() + defer c.mutex.Unlock() + + if q, ok := c.queues[name]; ok { + return q + } + + newQ := make(chan LogEntry, c.queueSize) + c.queues[name] = newQ + return newQ +} diff --git a/test/e2e/instrumentation/logging/utils/wait.go b/test/e2e/instrumentation/logging/utils/wait.go new file mode 100644 index 00000000000..6f0e44c5d6e --- /dev/null +++ b/test/e2e/instrumentation/logging/utils/wait.go @@ -0,0 +1,204 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "fmt" + "strings" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" +) + +// LogChecker is an interface for an entity that can check whether logging +// backend contains all wanted log entries. +type LogChecker interface { + EntriesIngested() (bool, error) + Timeout() error +} + +// IngestionPred is a type of a function that checks whether all required +// log entries were ingested. +type IngestionPred func(string, []LogEntry) (bool, error) + +// UntilFirstEntry is a IngestionPred that checks that at least one entry was +// ingested. +var UntilFirstEntry IngestionPred = func(_ string, entries []LogEntry) (bool, error) { + return len(entries) > 0, nil +} + +// TimeoutFun is a function that is called when the waiting times out. +type TimeoutFun func([]string, []bool) error + +// JustTimeout returns the error with the list of names for which backend is +// still still missing logs. +var JustTimeout TimeoutFun = func(names []string, ingested []bool) error { + failedNames := []string{} + for i, name := range names { + if !ingested[i] { + failedNames = append(failedNames, name) + } + } + return fmt.Errorf("timed out waiting for ingestion, still not ingested: %s", + strings.Join(failedNames, ",")) +} + +var _ LogChecker = &logChecker{} + +type logChecker struct { + provider LogProvider + names []string + ingested []bool + ingestionPred IngestionPred + timeoutFun TimeoutFun +} + +// NewLogChecker constructs a LogChecker for a list of names from custom +// IngestionPred and TimeoutFun. +func NewLogChecker(p LogProvider, pred IngestionPred, timeout TimeoutFun, names ...string) LogChecker { + return &logChecker{ + provider: p, + names: names, + ingested: make([]bool, len(names)), + ingestionPred: pred, + timeoutFun: timeout, + } +} + +func (c *logChecker) EntriesIngested() (bool, error) { + allIngested := true + for i, name := range c.names { + if c.ingested[i] { + continue + } + entries := c.provider.ReadEntries(name) + ingested, err := c.ingestionPred(name, entries) + if err != nil { + return false, err + } + if ingested { + c.ingested[i] = true + } + allIngested = allIngested && ingested + } + return allIngested, nil +} + +func (c *logChecker) Timeout() error { + return c.timeoutFun(c.names, c.ingested) +} + +// NumberedIngestionPred is a IngestionPred that takes into account sequential +// numbers of ingested entries. +type NumberedIngestionPred func(string, map[int]bool) (bool, error) + +// NumberedTimeoutFun is a TimeoutFun that takes into account sequential +// numbers of ingested entries. +type NumberedTimeoutFun func([]string, map[string]map[int]bool) error + +// NewNumberedLogChecker returns a log checker that works with numbered log +// entries generated by load logging pods. +func NewNumberedLogChecker(p LogProvider, pred NumberedIngestionPred, + timeout NumberedTimeoutFun, names ...string) LogChecker { + occs := map[string]map[int]bool{} + return NewLogChecker(p, func(name string, entries []LogEntry) (bool, error) { + occ, ok := occs[name] + if !ok { + occ = map[int]bool{} + occs[name] = occ + } + for _, entry := range entries { + if no, ok := entry.TryGetEntryNumber(); ok { + occ[no] = true + } + } + return pred(name, occ) + }, func(names []string, _ []bool) error { + return timeout(names, occs) + }, names...) +} + +// NewFullIngestionPodLogChecker returns a log checks that works with numbered +// log entries generated by load logging pods and waits until all entries are +// ingested. If timeout is reached, fraction is lost logs up to slack is +// considered tolerable. +func NewFullIngestionPodLogChecker(p LogProvider, slack float64, pods ...FiniteLoggingPod) LogChecker { + podsMap := map[string]FiniteLoggingPod{} + for _, p := range pods { + podsMap[p.Name()] = p + } + return NewNumberedLogChecker(p, getFullIngestionPred(podsMap), + getFullIngestionTimeout(podsMap, slack), getFiniteLoggingPodNames(pods)...) +} + +func getFullIngestionPred(podsMap map[string]FiniteLoggingPod) NumberedIngestionPred { + return func(name string, occ map[int]bool) (bool, error) { + p := podsMap[name] + ok := len(occ) == p.ExpectedLineCount() + if !ok { + framework.Logf("Pod %s is still missing %d lines", name, p.ExpectedLineCount()-len(occ)) + } + return ok, nil + } +} + +func getFullIngestionTimeout(podsMap map[string]FiniteLoggingPod, slack float64) NumberedTimeoutFun { + return func(names []string, occs map[string]map[int]bool) error { + totalGot, totalWant := 0, 0 + podsWithLosses := []string{} + for _, name := range names { + got := len(occs[name]) + want := podsMap[name].ExpectedLineCount() + if got != want { + podsWithLosses = append(podsWithLosses, name) + } + totalGot += got + totalWant += want + } + if len(podsWithLosses) > 0 { + framework.Logf("Still missing logs from: %s", strings.Join(podsWithLosses, ", ")) + } + lostFrac := 1 - float64(totalGot)/float64(totalWant) + if lostFrac > slack { + return fmt.Errorf("still missing %.2f%% of logs, only %.2f%% is tolerable", + lostFrac*100, slack*100) + } + return nil + } +} + +// WaitForLogs checks that logs are ingested, as reported by the log checker +// until the timeout has passed. Function sleeps for interval between two +// log ingestion checks. +func WaitForLogs(c LogChecker, interval, timeout time.Duration) error { + err := wait.Poll(interval, timeout, func() (bool, error) { + return c.EntriesIngested() + }) + if err == wait.ErrWaitTimeout { + return c.Timeout() + } + return err +} + +func getFiniteLoggingPodNames(pods []FiniteLoggingPod) []string { + names := []string{} + for _, p := range pods { + names = append(names, p.Name()) + } + return names +} diff --git a/test/e2e/instrumentation/monitoring/BUILD b/test/e2e/instrumentation/monitoring/BUILD index 6ddf58e6082..1c898613758 100644 --- a/test/e2e/instrumentation/monitoring/BUILD +++ b/test/e2e/instrumentation/monitoring/BUILD @@ -18,7 +18,7 @@ go_library( deps = [ "//test/e2e/common:go_default_library", "//test/e2e/framework:go_default_library", - "//test/e2e/instrumentation:go_default_library", + "//test/e2e/instrumentation/common:go_default_library", "//vendor/github.com/influxdata/influxdb/client/v2:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/golang.org/x/oauth2/google:go_default_library", diff --git a/test/e2e/instrumentation/monitoring/cadvisor.go b/test/e2e/instrumentation/monitoring/cadvisor.go index da87f061163..76bd4a0cf76 100644 --- a/test/e2e/instrumentation/monitoring/cadvisor.go +++ b/test/e2e/instrumentation/monitoring/cadvisor.go @@ -23,7 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" . "github.com/onsi/ginkgo" ) diff --git a/test/e2e/instrumentation/monitoring/influxdb.go b/test/e2e/instrumentation/monitoring/influxdb.go index 348dde9d6ca..0ea374a977a 100644 --- a/test/e2e/instrumentation/monitoring/influxdb.go +++ b/test/e2e/instrumentation/monitoring/influxdb.go @@ -29,7 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" . "github.com/onsi/ginkgo" ) diff --git a/test/e2e/instrumentation/monitoring/stackdriver.go b/test/e2e/instrumentation/monitoring/stackdriver.go index 0c41722e077..3b9905e8b17 100644 --- a/test/e2e/instrumentation/monitoring/stackdriver.go +++ b/test/e2e/instrumentation/monitoring/stackdriver.go @@ -29,7 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/test/e2e/common" "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation" + instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" gcm "google.golang.org/api/monitoring/v3" )