diff --git a/test/e2e/instrumentation/logging/BUILD b/test/e2e/instrumentation/logging/BUILD index bda535982b4..97cf77d9411 100644 --- a/test/e2e/instrumentation/logging/BUILD +++ b/test/e2e/instrumentation/logging/BUILD @@ -19,7 +19,6 @@ go_library( "//test/e2e/framework/node:go_default_library", "//test/e2e/instrumentation/common:go_default_library", "//test/e2e/instrumentation/logging/elasticsearch:go_default_library", - "//test/e2e/instrumentation/logging/stackdriver:go_default_library", "//test/utils/image:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", ], @@ -37,7 +36,6 @@ filegroup( srcs = [ ":package-srcs", "//test/e2e/instrumentation/logging/elasticsearch:all-srcs", - "//test/e2e/instrumentation/logging/stackdriver:all-srcs", "//test/e2e/instrumentation/logging/utils:all-srcs", ], tags = ["automanaged"], diff --git a/test/e2e/instrumentation/logging/imports.go b/test/e2e/instrumentation/logging/imports.go index 5dd66717db1..e66db4f3994 100644 --- a/test/e2e/instrumentation/logging/imports.go +++ b/test/e2e/instrumentation/logging/imports.go @@ -18,5 +18,4 @@ package logging import ( _ "k8s.io/kubernetes/test/e2e/instrumentation/logging/elasticsearch" // for elasticsearch provider - _ "k8s.io/kubernetes/test/e2e/instrumentation/logging/stackdriver" // for stackdriver provider ) diff --git a/test/e2e/instrumentation/logging/stackdriver/BUILD b/test/e2e/instrumentation/logging/stackdriver/BUILD deleted file mode 100644 index b88a71bf7ec..00000000000 --- a/test/e2e/instrumentation/logging/stackdriver/BUILD +++ /dev/null @@ -1,44 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) - -go_library( - name = "go_default_library", - srcs = [ - "basic.go", - "soak.go", - "utils.go", - ], - importpath = "k8s.io/kubernetes/test/e2e/instrumentation/logging/stackdriver", - deps = [ - "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//test/e2e/framework:go_default_library", - "//test/e2e/framework/node:go_default_library", - "//test/e2e/framework/skipper:go_default_library", - "//test/e2e/instrumentation/common:go_default_library", - "//test/e2e/instrumentation/logging/utils:go_default_library", - "//vendor/github.com/onsi/ginkgo:go_default_library", - "//vendor/golang.org/x/oauth2/google:go_default_library", - "//vendor/google.golang.org/api/logging/v2beta1:go_default_library", - "//vendor/google.golang.org/api/option:go_default_library", - "//vendor/google.golang.org/api/pubsub/v1:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/test/e2e/instrumentation/logging/stackdriver/basic.go b/test/e2e/instrumentation/logging/stackdriver/basic.go deleted file mode 100644 index 563aa8cb5fd..00000000000 --- a/test/e2e/instrumentation/logging/stackdriver/basic.go +++ /dev/null @@ -1,195 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package stackdriver - -import ( - "fmt" - "time" - - "k8s.io/apimachinery/pkg/util/json" - "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubernetes/test/e2e/framework" - e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" - instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" - "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils" - - "github.com/onsi/ginkgo" -) - -const ( - ingestionInterval = 10 * time.Second - ingestionTimeout = 10 * time.Minute -) - -var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver", func() { - f := framework.NewDefaultFramework("sd-logging") - - ginkgo.BeforeEach(func() { - e2eskipper.SkipUnlessProviderIs("gce", "gke") - }) - - ginkgo.It("should ingest logs [Feature:StackdriverLogging]", func() { - withLogProviderForScope(f, podsScope, func(p *sdLogProvider) { - ginkgo.By("Checking ingesting text logs", func() { - pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-1", "hey"), f) - framework.ExpectNoError(err, "Failed to start a pod") - - ginkgo.By("Waiting for logs to ingest") - c := utils.NewLogChecker(p, utils.UntilFirstEntry, utils.JustTimeout, pod.Name()) - err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) - framework.ExpectNoError(err) - }) - - ginkgo.By("Checking ingesting json logs", func() { - logRaw := "{\"a\":\"b\"}" - pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-2", logRaw), f) - framework.ExpectNoError(err, "Failed to start a pod") - - ginkgo.By("Waiting for logs to ingest") - c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) { - if len(logEntries) == 0 { - return false, nil - } - log := logEntries[0] - if log.JSONPayload == nil { - return false, fmt.Errorf("log entry unexpectedly is not json: %s", log.TextPayload) - } - if log.JSONPayload["a"] != "b" { - bytes, err := json.Marshal(log.JSONPayload) - if err != nil { - return false, fmt.Errorf("log entry ingested incorrectly, failed to marshal: %v", err) - } - return false, fmt.Errorf("log entry ingested incorrectly, got %v, want %s", - string(bytes), logRaw) - } - return true, nil - }, utils.JustTimeout, pod.Name()) - err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) - framework.ExpectNoError(err) - }) - - ginkgo.By("Checking ingesting logs in glog format", func() { - logUnformatted := "Text" - logRaw := fmt.Sprintf("I0101 00:00:00.000000 1 main.go:1] %s", logUnformatted) - pod, err := utils.StartAndReturnSelf(utils.NewRepeatingLoggingPod("synthlogger-3", logRaw), f) - framework.ExpectNoError(err, "Failed to start a pod") - - ginkgo.By("Waiting for logs to ingest") - c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) { - if len(logEntries) == 0 { - return false, nil - } - log := logEntries[0] - if log.TextPayload == "" { - return false, fmt.Errorf("log entry is unexpectedly json: %v", log.JSONPayload) - } - if log.TextPayload != logUnformatted { - return false, fmt.Errorf("log entry ingested incorrectly, got %s, want %s", - log.TextPayload, logUnformatted) - } - return true, nil - }, utils.JustTimeout, pod.Name()) - err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) - framework.ExpectNoError(err) - }) - - ginkgo.By("Checking that too long lines are trimmed", func() { - maxLength := 100 * 1024 - cmd := []string{ - "/bin/sh", - "-c", - fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 60; done", maxLength+1), - } - - pod, err := utils.StartAndReturnSelf(utils.NewExecLoggingPod("synthlogger-4", cmd), f) - framework.ExpectNoError(err, "Failed to start a pod") - - ginkgo.By("Waiting for logs to ingest") - c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) { - if len(logEntries) == 0 { - return false, nil - } - log := logEntries[0] - if log.JSONPayload != nil { - return false, fmt.Errorf("got json log entry %v, wanted plain text", log.JSONPayload) - } - if len(log.TextPayload) > maxLength { - return false, fmt.Errorf("got too long entry of length %d", len(log.TextPayload)) - } - return true, nil - }, utils.JustTimeout, pod.Name()) - err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) - framework.ExpectNoError(err) - }) - }) - }) - - ginkgo.It("should ingest events [Feature:StackdriverLogging]", func() { - eventCreationInterval := 10 * time.Second - - withLogProviderForScope(f, eventsScope, func(p *sdLogProvider) { - ginkgo.By("Running pods to generate events while waiting for some of them to be ingested") - stopCh := make(chan struct{}) - cleanupCh := make(chan struct{}) - defer func() { <-cleanupCh }() - defer close(stopCh) - go func() { - defer ginkgo.GinkgoRecover() - defer close(cleanupCh) - - wait.PollUntil(eventCreationInterval, func() (bool, error) { - podName := fmt.Sprintf("synthlogger-%s", string(uuid.NewUUID())) - err := utils.NewLoadLoggingPod(podName, "", 1, 1*time.Second).Start(f) - if err != nil { - framework.Logf("Failed to create a logging pod: %v", err) - } - return false, nil - }, stopCh) - }() - - ginkgo.By("Waiting for events to ingest") - location := framework.TestContext.CloudConfig.Zone - if framework.TestContext.CloudConfig.MultiMaster { - location = framework.TestContext.CloudConfig.Region - } - c := utils.NewLogChecker(p, utils.UntilFirstEntryFromLocation(location), utils.JustTimeout, "") - err := utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) - framework.ExpectNoError(err) - }) - }) - - ginkgo.It("should ingest system logs from all nodes [Feature:StackdriverLogging]", func() { - withLogProviderForScope(f, systemScope, func(p *sdLogProvider) { - ginkgo.By("Waiting for some kubelet logs to be ingested from each node", func() { - nodeIds := utils.GetNodeIds(f.ClientSet) - log := fmt.Sprintf("projects/%s/logs/kubelet", framework.TestContext.CloudConfig.ProjectID) - c := utils.NewLogChecker(p, utils.UntilFirstEntryFromLog(log), utils.JustTimeout, nodeIds...) - err := utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) - framework.ExpectNoError(err) - }) - - ginkgo.By("Waiting for some container runtime logs to be ingested from each node", func() { - nodeIds := utils.GetNodeIds(f.ClientSet) - log := fmt.Sprintf("projects/%s/logs/container-runtime", framework.TestContext.CloudConfig.ProjectID) - c := utils.NewLogChecker(p, utils.UntilFirstEntryFromLog(log), utils.JustTimeout, nodeIds...) - err := utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) - framework.ExpectNoError(err) - }) - }) - }) -}) diff --git a/test/e2e/instrumentation/logging/stackdriver/soak.go b/test/e2e/instrumentation/logging/stackdriver/soak.go deleted file mode 100644 index bdb2960ca9d..00000000000 --- a/test/e2e/instrumentation/logging/stackdriver/soak.go +++ /dev/null @@ -1,101 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package stackdriver - -import ( - "fmt" - "math" - "time" - - "k8s.io/kubernetes/test/e2e/framework" - e2enode "k8s.io/kubernetes/test/e2e/framework/node" - instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common" - "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils" - - "github.com/onsi/ginkgo" -) - -const ( - // maxAllowedLostFraction is the fraction of lost logs considered acceptable. - maxAllowedLostFraction = 0.01 - // maxAllowedRestartsPerHour is the number of fluentd container restarts - // considered acceptable. Once per hour is fine for now, as long as it - // doesn't loose too much logs. - maxAllowedRestartsPerHour = 1.0 -) - -var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackdriver [Feature:StackdriverLogging] [Soak]", func() { - f := framework.NewDefaultFramework("sd-logging-load") - - ginkgo.It("should ingest logs from applications running for a prolonged amount of time", func() { - withLogProviderForScope(f, podsScope, func(p *sdLogProvider) { - nodes, err := e2enode.GetReadySchedulableNodes(f.ClientSet) - framework.ExpectNoError(err) - maxPodCount := 10 - jobDuration := 30 * time.Minute - linesPerPodPerSecond := 100 - // TODO(instrumentation): Increase to 21 hrs - testDuration := 3 * time.Hour - ingestionInterval := 1 * time.Minute - ingestionTimeout := testDuration + 30*time.Minute - allowedRestarts := int(math.Ceil(float64(testDuration) / - float64(time.Hour) * maxAllowedRestartsPerHour)) - - podRunDelay := time.Duration(int64(jobDuration) / int64(maxPodCount)) - podRunCount := maxPodCount*(int(testDuration/jobDuration)-1) + 1 - linesPerPod := linesPerPodPerSecond * int(jobDuration.Seconds()) - - // pods is a flat array of all pods to be run and to expect in Stackdriver. - pods := []utils.FiniteLoggingPod{} - // podsByRun is a two-dimensional array of pods, first dimension is the run - // index, the second dimension is the node index. Since we want to create - // an equal load on all nodes, for the same run we have one pod per node. - podsByRun := [][]utils.FiniteLoggingPod{} - for runIdx := 0; runIdx < podRunCount; runIdx++ { - podsInRun := []utils.FiniteLoggingPod{} - for nodeIdx, node := range nodes.Items { - podName := fmt.Sprintf("job-logs-generator-%d-%d-%d-%d", maxPodCount, linesPerPod, runIdx, nodeIdx) - pod := utils.NewLoadLoggingPod(podName, node.Name, linesPerPod, jobDuration) - pods = append(pods, pod) - podsInRun = append(podsInRun, pod) - } - podsByRun = append(podsByRun, podsInRun) - } - - ginkgo.By("Running short-living pods") - go func() { - t := time.NewTicker(podRunDelay) - defer t.Stop() - for runIdx := 0; runIdx < podRunCount; runIdx++ { - // Starting one pod on each node. - for _, pod := range podsByRun[runIdx] { - if err := pod.Start(f); err != nil { - framework.Logf("Failed to start pod: %v", err) - } - } - <-t.C - } - }() - - checker := utils.NewFullIngestionPodLogChecker(p, maxAllowedLostFraction, pods...) - err = utils.WaitForLogs(checker, ingestionInterval, ingestionTimeout) - framework.ExpectNoError(err) - - utils.EnsureLoggingAgentRestartsCount(f, p.LoggingAgentName(), allowedRestarts) - }) - }) -}) diff --git a/test/e2e/instrumentation/logging/stackdriver/utils.go b/test/e2e/instrumentation/logging/stackdriver/utils.go deleted file mode 100644 index 81ad806538e..00000000000 --- a/test/e2e/instrumentation/logging/stackdriver/utils.go +++ /dev/null @@ -1,444 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package stackdriver - -import ( - "context" - "encoding/base64" - "encoding/json" - "fmt" - "sync" - "time" - - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubernetes/test/e2e/framework" - "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils" - - "golang.org/x/oauth2/google" - sd "google.golang.org/api/logging/v2beta1" - "google.golang.org/api/option" - pubsub "google.golang.org/api/pubsub/v1" -) - -const ( - // The amount of time to wait for Stackdriver Logging - // sink to become operational - sinkStartupTimeout = 10 * time.Minute - - // The limit on the number of messages to pull from PubSub - maxPullLogMessages = 100 * 1000 - - // maxQueueSize is the limit on the number of messages in the single queue. - maxQueueSize = 10 * 1000 - - // PubSub topic with log entries polling interval - sdLoggingPollInterval = 100 * time.Millisecond - - // The parallelism level of polling logs process. - sdLoggingPollParallelism = 10 - - // The limit on the number of stackdriver sinks that can be created within one project. - stackdriverSinkCountLimit = 90 -) - -type logProviderScope int - -const ( - podsScope logProviderScope = iota - eventsScope - systemScope -) - -var _ utils.LogProvider = &sdLogProvider{} - -type sdLogProvider struct { - sdService *sd.Service - pubsubService *pubsub.Service - - framework *framework.Framework - - topic *pubsub.Topic - subscription *pubsub.Subscription - logSink *sd.LogSink - - pollingStopChannel chan struct{} - pollingWG *sync.WaitGroup - - queueCollection utils.LogsQueueCollection - - scope logProviderScope -} - -func newSdLogProvider(f *framework.Framework, scope logProviderScope) (*sdLogProvider, error) { - ctx := context.Background() - hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope) - framework.ExpectNoError(err) - sdService, err := sd.NewService(ctx, option.WithHTTPClient(hc)) - if err != nil { - return nil, err - } - err = ensureProjectHasSinkCapacity(sdService.Projects.Sinks, framework.TestContext.CloudConfig.ProjectID) - if err != nil { - return nil, err - } - - pubsubService, err := pubsub.NewService(ctx, option.WithHTTPClient(hc)) - if err != nil { - return nil, err - } - - provider := &sdLogProvider{ - scope: scope, - sdService: sdService, - pubsubService: pubsubService, - framework: f, - pollingStopChannel: make(chan struct{}), - pollingWG: &sync.WaitGroup{}, - queueCollection: utils.NewLogsQueueCollection(maxQueueSize), - } - return provider, nil -} - -func ensureProjectHasSinkCapacity(sinksService *sd.ProjectsSinksService, projectID string) error { - listResponse, err := listSinks(sinksService, projectID) - if err != nil { - return err - } - if len(listResponse.Sinks) >= stackdriverSinkCountLimit { - framework.Logf("Reached Stackdriver sink limit. Deleting all sinks") - deleteSinks(sinksService, projectID, listResponse.Sinks) - } - return nil -} - -func listSinks(sinksService *sd.ProjectsSinksService, projectID string) (*sd.ListSinksResponse, error) { - projectDst := fmt.Sprintf("projects/%s", projectID) - listResponse, err := sinksService.List(projectDst).PageSize(stackdriverSinkCountLimit).Do() - if err != nil { - return nil, fmt.Errorf("failed to list Stackdriver Logging sinks: %v", err) - } - return listResponse, nil -} - -func deleteSinks(sinksService *sd.ProjectsSinksService, projectID string, sinks []*sd.LogSink) { - for _, sink := range sinks { - sinkNameID := fmt.Sprintf("projects/%s/sinks/%s", projectID, sink.Name) - if _, err := sinksService.Delete(sinkNameID).Do(); err != nil { - framework.Logf("Failed to delete LogSink: %v", err) - } - } -} - -func (p *sdLogProvider) Init() error { - projectID := framework.TestContext.CloudConfig.ProjectID - nsName := p.framework.Namespace.Name - - topic, err := p.createPubSubTopic(projectID, nsName) - if err != nil { - return fmt.Errorf("failed to create PubSub topic: %v", err) - } - p.topic = topic - - subs, err := p.createPubSubSubscription(projectID, nsName, topic.Name) - if err != nil { - return fmt.Errorf("failed to create PubSub subscription: %v", err) - } - p.subscription = subs - - logSink, err := p.createSink(projectID, nsName, topic.Name) - if err != nil { - return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err) - } - p.logSink = logSink - - if err = p.authorizeSink(); err != nil { - return fmt.Errorf("failed to authorize log sink: %v", err) - } - - if err = p.waitSinkInit(); err != nil { - return fmt.Errorf("failed to wait for sink to become operational: %v", err) - } - - p.startPollingLogs() - - return nil -} - -func (p *sdLogProvider) Cleanup() { - close(p.pollingStopChannel) - p.pollingWG.Wait() - - if p.logSink != nil { - projectID := framework.TestContext.CloudConfig.ProjectID - sinkNameID := fmt.Sprintf("projects/%s/sinks/%s", projectID, p.logSink.Name) - sinksService := p.sdService.Projects.Sinks - if _, err := sinksService.Delete(sinkNameID).Do(); err != nil { - framework.Logf("Failed to delete LogSink: %v", err) - } - } - - if p.subscription != nil { - subsService := p.pubsubService.Projects.Subscriptions - if _, err := subsService.Delete(p.subscription.Name).Do(); err != nil { - framework.Logf("Failed to delete PubSub subscription: %v", err) - } - } - - if p.topic != nil { - topicsService := p.pubsubService.Projects.Topics - if _, err := topicsService.Delete(p.topic.Name).Do(); err != nil { - framework.Logf("Failed to delete PubSub topic: %v", err) - } - } -} - -func (p *sdLogProvider) ReadEntries(name string) []utils.LogEntry { - return p.queueCollection.Pop(name) -} - -func (p *sdLogProvider) LoggingAgentName() string { - return "fluentd-gcp" -} - -func (p *sdLogProvider) createPubSubTopic(projectID, topicName string) (*pubsub.Topic, error) { - topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName) - topic := &pubsub.Topic{ - Name: topicFullName, - } - return p.pubsubService.Projects.Topics.Create(topicFullName, topic).Do() -} - -func (p *sdLogProvider) createPubSubSubscription(projectID, subsName, topicName string) (*pubsub.Subscription, error) { - subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subsName) - subs := &pubsub.Subscription{ - Name: subsFullName, - Topic: topicName, - } - return p.pubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do() -} - -func (p *sdLogProvider) createSink(projectID, sinkName, topicName string) (*sd.LogSink, error) { - filter, err := p.buildFilter() - if err != nil { - return nil, err - } - framework.Logf("Using the following filter for log entries: %s", filter) - sink := &sd.LogSink{ - Name: sinkName, - Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName), - Filter: filter, - } - projectDst := fmt.Sprintf("projects/%s", projectID) - return p.sdService.Projects.Sinks.Create(projectDst, sink).Do() -} - -func (p *sdLogProvider) buildFilter() (string, error) { - switch p.scope { - case podsScope: - return fmt.Sprintf("resource.type=\"container\" AND resource.labels.namespace_id=\"%s\"", - p.framework.Namespace.Name), nil - case eventsScope: - return fmt.Sprintf("resource.type=\"gke_cluster\" AND jsonPayload.metadata.namespace=\"%s\"", - p.framework.Namespace.Name), nil - case systemScope: - // TODO(instrumentation): Filter logs from the current project only. - return "resource.type=\"gce_instance\"", nil - } - return "", fmt.Errorf("Unknown log provider scope: %v", p.scope) -} - -func (p *sdLogProvider) authorizeSink() error { - topicsService := p.pubsubService.Projects.Topics - policy, err := topicsService.GetIamPolicy(p.topic.Name).Do() - if err != nil { - return err - } - - binding := &pubsub.Binding{ - Role: "roles/pubsub.publisher", - Members: []string{p.logSink.WriterIdentity}, - } - policy.Bindings = append(policy.Bindings, binding) - req := &pubsub.SetIamPolicyRequest{Policy: policy} - if _, err = topicsService.SetIamPolicy(p.topic.Name, req).Do(); err != nil { - return err - } - - return nil -} - -func (p *sdLogProvider) waitSinkInit() error { - framework.Logf("Waiting for log sink to become operational") - return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) { - err := publish(p.pubsubService, p.topic, "embrace eternity") - if err != nil { - framework.Logf("Failed to push message to PubSub due to %v", err) - } - - messages, err := pullAndAck(p.pubsubService, p.subscription) - if err != nil { - framework.Logf("Failed to pull messages from PubSub due to %v", err) - return false, nil - } - if len(messages) > 0 { - framework.Logf("Sink %s is operational", p.logSink.Name) - return true, nil - } - - return false, nil - }) -} - -func (p *sdLogProvider) startPollingLogs() { - for i := 0; i < sdLoggingPollParallelism; i++ { - p.pollingWG.Add(1) - go func() { - defer p.pollingWG.Done() - - wait.PollUntil(sdLoggingPollInterval, func() (bool, error) { - p.pollLogsOnce() - return false, nil - }, p.pollingStopChannel) - }() - } -} - -func (p *sdLogProvider) pollLogsOnce() { - messages, err := pullAndAck(p.pubsubService, p.subscription) - if err != nil { - framework.Logf("Failed to pull messages from PubSub due to %v", err) - return - } - - for _, msg := range messages { - logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data) - if err != nil { - framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data) - continue - } - - var sdLogEntry sd.LogEntry - if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil { - framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err) - continue - } - - name, ok := p.tryGetName(sdLogEntry) - if !ok { - framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type) - continue - } - - logEntry, err := convertLogEntry(sdLogEntry) - if err != nil { - framework.Logf("Failed to parse Stackdriver LogEntry: %v", err) - continue - } - - p.queueCollection.Push(name, logEntry) - } -} - -func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) { - switch sdLogEntry.Resource.Type { - case "container": - return sdLogEntry.Resource.Labels["pod_id"], true - case "gke_cluster": - return "", true - case "gce_instance": - return sdLogEntry.Resource.Labels["instance_id"], true - } - return "", false -} - -func convertLogEntry(sdLogEntry sd.LogEntry) (entry utils.LogEntry, err error) { - entry = utils.LogEntry{LogName: sdLogEntry.LogName} - entry.Location = sdLogEntry.Resource.Labels["location"] - - if sdLogEntry.TextPayload != "" { - entry.TextPayload = sdLogEntry.TextPayload - return - } - - bytes, err := sdLogEntry.JsonPayload.MarshalJSON() - if err != nil { - err = fmt.Errorf("Failed to get jsonPayload from LogEntry %v", sdLogEntry) - return - } - - var jsonObject map[string]interface{} - err = json.Unmarshal(bytes, &jsonObject) - if err != nil { - err = fmt.Errorf("Failed to deserialize jsonPayload as json object %s", string(bytes[:])) - return - } - entry.JSONPayload = jsonObject - return -} - -func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) { - subsService := service.Projects.Subscriptions - req := &pubsub.PullRequest{ - ReturnImmediately: true, - MaxMessages: maxPullLogMessages, - } - - resp, err := subsService.Pull(subs.Name, req).Do() - if err != nil { - return nil, err - } - - var ids []string - for _, msg := range resp.ReceivedMessages { - ids = append(ids, msg.AckId) - } - if len(ids) > 0 { - ackReq := &pubsub.AcknowledgeRequest{AckIds: ids} - if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil { - framework.Logf("Failed to ack poll: %v", err) - } - } - - return resp.ReceivedMessages, nil -} - -func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error { - topicsService := service.Projects.Topics - req := &pubsub.PublishRequest{ - Messages: []*pubsub.PubsubMessage{ - { - Data: base64.StdEncoding.EncodeToString([]byte(msg)), - }, - }, - } - _, err := topicsService.Publish(topic.Name, req).Do() - return err -} - -func withLogProviderForScope(f *framework.Framework, scope logProviderScope, fun func(*sdLogProvider)) { - p, err := newSdLogProvider(f, scope) - framework.ExpectNoError(err, "Failed to create Stackdriver logs provider") - - err = p.Init() - defer p.Cleanup() - framework.ExpectNoError(err, "Failed to init Stackdriver logs provider") - - err = utils.EnsureLoggingAgentDeployment(f, p.LoggingAgentName()) - framework.ExpectNoError(err, "Logging agents deployed incorrectly") - - fun(p) -}