From e8a72dbb4ba1369b0524655f27c32531ba144d9e Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Mon, 18 Dec 2017 15:46:46 +0100 Subject: [PATCH] Fix Stackdriver Logging e2e tests Signed-off-by: Mik Vyatskov --- .../logging/stackdrvier/basic.go | 17 +--- .../logging/stackdrvier/utils.go | 89 +++++++++++-------- 2 files changed, 57 insertions(+), 49 deletions(-) diff --git a/test/e2e/instrumentation/logging/stackdrvier/basic.go b/test/e2e/instrumentation/logging/stackdrvier/basic.go index ae1ba4a6f77..7bc5e682b5f 100644 --- a/test/e2e/instrumentation/logging/stackdrvier/basic.go +++ b/test/e2e/instrumentation/logging/stackdrvier/basic.go @@ -18,7 +18,6 @@ package stackdriver import ( "fmt" - "strings" "time" "k8s.io/apimachinery/pkg/util/wait" @@ -107,19 +106,14 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) framework.ExpectNoError(err) }) - }) - }) - ginkgo.It("should ingest logs [Feature:StackdriverLogging]", func() { - withLogProviderForScope(f, podsScope, func(p *sdLogProvider) { ginkgo.By("Checking that too long lines are trimmed", func() { - originalLength := 100001 + maxLength := 100000 cmd := []string{ "/bin/sh", "-c", - fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 60; done", originalLength), + fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 60; done", maxLength+1), } - trimPrefix := "[Trimmed]" pod, err := utils.StartAndReturnSelf(utils.NewExecLoggingPod("synthlogger-4", cmd), f) framework.ExpectNoError(err, "Failed to start a pod") @@ -133,11 +127,8 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd if log.JSONPayload != nil { return false, fmt.Errorf("got json log entry %v, wanted plain text", log.JSONPayload) } - if len(log.TextPayload) == originalLength { - return false, fmt.Errorf("got non-trimmed entry of length %d", len(log.TextPayload)) - } - if !strings.HasPrefix(log.TextPayload, trimPrefix) { - return false, fmt.Errorf("got message without prefix '%s': %s", trimPrefix, log.TextPayload) + if len(log.TextPayload) > maxLength { + return false, fmt.Errorf("got too long entry of length %d", len(log.TextPayload)) } return true, nil }, utils.JustTimeout, pod.Name()) diff --git a/test/e2e/instrumentation/logging/stackdrvier/utils.go b/test/e2e/instrumentation/logging/stackdrvier/utils.go index 7cd7bd7ebe6..482fc120ece 100644 --- a/test/e2e/instrumentation/logging/stackdrvier/utils.go +++ b/test/e2e/instrumentation/logging/stackdrvier/utils.go @@ -20,6 +20,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "sync" "time" "k8s.io/apimachinery/pkg/util/wait" @@ -45,6 +46,9 @@ const ( // PubSub topic with log entries polling interval sdLoggingPollInterval = 100 * time.Millisecond + + // The parallelism level of polling logs process. + sdLoggingPollParallelism = 10 ) type logProviderScope int @@ -68,6 +72,7 @@ type sdLogProvider struct { logSink *sd.LogSink pollingStopChannel chan struct{} + pollingWG *sync.WaitGroup queueCollection utils.LogsQueueCollection @@ -92,7 +97,8 @@ func newSdLogProvider(f *framework.Framework, scope logProviderScope) (*sdLogPro sdService: sdService, pubsubService: pubsubService, framework: f, - pollingStopChannel: make(chan struct{}, 1), + pollingStopChannel: make(chan struct{}), + pollingWG: &sync.WaitGroup{}, queueCollection: utils.NewLogsQueueCollection(maxQueueSize), } return provider, nil @@ -128,13 +134,14 @@ func (p *sdLogProvider) Init() error { return fmt.Errorf("failed to wait for sink to become operational: %v", err) } - go p.pollLogs() + p.startPollingLogs() return nil } func (p *sdLogProvider) Cleanup() { - p.pollingStopChannel <- struct{}{} + close(p.pollingStopChannel) + p.pollingWG.Wait() if p.logSink != nil { projectID := framework.TestContext.CloudConfig.ProjectID @@ -257,44 +264,54 @@ func (p *sdLogProvider) waitSinkInit() error { }) } -func (p *sdLogProvider) pollLogs() { - wait.PollUntil(sdLoggingPollInterval, func() (bool, error) { - messages, err := pullAndAck(p.pubsubService, p.subscription) +func (p *sdLogProvider) startPollingLogs() { + for i := 0; i < sdLoggingPollParallelism; i++ { + p.pollingWG.Add(1) + go func() { + defer p.pollingWG.Done() + + wait.PollUntil(sdLoggingPollInterval, func() (bool, error) { + p.pollLogsOnce() + return false, nil + }, p.pollingStopChannel) + }() + } +} + +func (p *sdLogProvider) pollLogsOnce() { + messages, err := pullAndAck(p.pubsubService, p.subscription) + if err != nil { + framework.Logf("Failed to pull messages from PubSub due to %v", err) + return + } + + for _, msg := range messages { + logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data) if err != nil { - framework.Logf("Failed to pull messages from PubSub due to %v", err) - return false, nil + framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data) + continue } - for _, msg := range messages { - logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data) - if err != nil { - framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data) - continue - } - - var sdLogEntry sd.LogEntry - if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil { - framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err) - continue - } - - name, ok := p.tryGetName(sdLogEntry) - if !ok { - framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type) - continue - } - - logEntry, err := convertLogEntry(sdLogEntry) - if err != nil { - framework.Logf("Failed to parse Stackdriver LogEntry: %v", err) - continue - } - - p.queueCollection.Push(name, logEntry) + var sdLogEntry sd.LogEntry + if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil { + framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err) + continue } - return false, nil - }, p.pollingStopChannel) + name, ok := p.tryGetName(sdLogEntry) + if !ok { + framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type) + continue + } + + logEntry, err := convertLogEntry(sdLogEntry) + if err != nil { + framework.Logf("Failed to parse Stackdriver LogEntry: %v", err) + continue + } + + p.queueCollection.Push(name, logEntry) + } } func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) {