Fix Stackdriver Logging e2e tests

Signed-off-by: Mik Vyatskov <vmik@google.com>
This commit is contained in:
Mik Vyatskov 2017-12-18 15:46:46 +01:00
parent a35ac9e8fc
commit e8a72dbb4b
2 changed files with 57 additions and 49 deletions

View File

@ -18,7 +18,6 @@ package stackdriver
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -107,19 +106,14 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
framework.ExpectNoError(err) framework.ExpectNoError(err)
}) })
})
})
ginkgo.It("should ingest logs [Feature:StackdriverLogging]", func() {
withLogProviderForScope(f, podsScope, func(p *sdLogProvider) {
ginkgo.By("Checking that too long lines are trimmed", func() { ginkgo.By("Checking that too long lines are trimmed", func() {
originalLength := 100001 maxLength := 100000
cmd := []string{ cmd := []string{
"/bin/sh", "/bin/sh",
"-c", "-c",
fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 60; done", originalLength), fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 60; done", maxLength+1),
} }
trimPrefix := "[Trimmed]"
pod, err := utils.StartAndReturnSelf(utils.NewExecLoggingPod("synthlogger-4", cmd), f) pod, err := utils.StartAndReturnSelf(utils.NewExecLoggingPod("synthlogger-4", cmd), f)
framework.ExpectNoError(err, "Failed to start a pod") framework.ExpectNoError(err, "Failed to start a pod")
@ -133,11 +127,8 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd
if log.JSONPayload != nil { if log.JSONPayload != nil {
return false, fmt.Errorf("got json log entry %v, wanted plain text", log.JSONPayload) return false, fmt.Errorf("got json log entry %v, wanted plain text", log.JSONPayload)
} }
if len(log.TextPayload) == originalLength { if len(log.TextPayload) > maxLength {
return false, fmt.Errorf("got non-trimmed entry of length %d", len(log.TextPayload)) return false, fmt.Errorf("got too long entry of length %d", len(log.TextPayload))
}
if !strings.HasPrefix(log.TextPayload, trimPrefix) {
return false, fmt.Errorf("got message without prefix '%s': %s", trimPrefix, log.TextPayload)
} }
return true, nil return true, nil
}, utils.JustTimeout, pod.Name()) }, utils.JustTimeout, pod.Name())

View File

@ -20,6 +20,7 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -45,6 +46,9 @@ const (
// PubSub topic with log entries polling interval // PubSub topic with log entries polling interval
sdLoggingPollInterval = 100 * time.Millisecond sdLoggingPollInterval = 100 * time.Millisecond
// The parallelism level of polling logs process.
sdLoggingPollParallelism = 10
) )
type logProviderScope int type logProviderScope int
@ -68,6 +72,7 @@ type sdLogProvider struct {
logSink *sd.LogSink logSink *sd.LogSink
pollingStopChannel chan struct{} pollingStopChannel chan struct{}
pollingWG *sync.WaitGroup
queueCollection utils.LogsQueueCollection queueCollection utils.LogsQueueCollection
@ -92,7 +97,8 @@ func newSdLogProvider(f *framework.Framework, scope logProviderScope) (*sdLogPro
sdService: sdService, sdService: sdService,
pubsubService: pubsubService, pubsubService: pubsubService,
framework: f, framework: f,
pollingStopChannel: make(chan struct{}, 1), pollingStopChannel: make(chan struct{}),
pollingWG: &sync.WaitGroup{},
queueCollection: utils.NewLogsQueueCollection(maxQueueSize), queueCollection: utils.NewLogsQueueCollection(maxQueueSize),
} }
return provider, nil return provider, nil
@ -128,13 +134,14 @@ func (p *sdLogProvider) Init() error {
return fmt.Errorf("failed to wait for sink to become operational: %v", err) return fmt.Errorf("failed to wait for sink to become operational: %v", err)
} }
go p.pollLogs() p.startPollingLogs()
return nil return nil
} }
func (p *sdLogProvider) Cleanup() { func (p *sdLogProvider) Cleanup() {
p.pollingStopChannel <- struct{}{} close(p.pollingStopChannel)
p.pollingWG.Wait()
if p.logSink != nil { if p.logSink != nil {
projectID := framework.TestContext.CloudConfig.ProjectID projectID := framework.TestContext.CloudConfig.ProjectID
@ -257,12 +264,25 @@ func (p *sdLogProvider) waitSinkInit() error {
}) })
} }
func (p *sdLogProvider) pollLogs() { func (p *sdLogProvider) startPollingLogs() {
for i := 0; i < sdLoggingPollParallelism; i++ {
p.pollingWG.Add(1)
go func() {
defer p.pollingWG.Done()
wait.PollUntil(sdLoggingPollInterval, func() (bool, error) { wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
p.pollLogsOnce()
return false, nil
}, p.pollingStopChannel)
}()
}
}
func (p *sdLogProvider) pollLogsOnce() {
messages, err := pullAndAck(p.pubsubService, p.subscription) messages, err := pullAndAck(p.pubsubService, p.subscription)
if err != nil { if err != nil {
framework.Logf("Failed to pull messages from PubSub due to %v", err) framework.Logf("Failed to pull messages from PubSub due to %v", err)
return false, nil return
} }
for _, msg := range messages { for _, msg := range messages {
@ -292,9 +312,6 @@ func (p *sdLogProvider) pollLogs() {
p.queueCollection.Push(name, logEntry) p.queueCollection.Push(name, logEntry)
} }
return false, nil
}, p.pollingStopChannel)
} }
func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) { func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) {