From d8525f8bd1a9cb732cc744ebd9a78589c4855ce1 Mon Sep 17 00:00:00 2001 From: Mik Vyatskov Date: Mon, 11 Sep 2017 18:51:46 +0200 Subject: [PATCH] [fluentd-gcp addon] Trim too long log entries due to Stackdriver limitation --- .../fluentd-gcp/fluentd-gcp-configmap.yaml | 14 +++++- .../addons/fluentd-gcp/fluentd-gcp-ds.yaml | 2 +- .../logging/stackdrvier/basic.go | 34 +++++++++++++++ .../logging/utils/logging_pod.go | 43 +++++++++++-------- 4 files changed, 73 insertions(+), 20 deletions(-) diff --git a/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml b/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml index 1b06178f6f1..ebe364cab8d 100644 --- a/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml +++ b/cluster/addons/fluentd-gcp/fluentd-gcp-configmap.yaml @@ -345,6 +345,18 @@ data: + # TODO(instrumentation): Reconsider this workaround later. + # Trim the entries which exceed slightly less than 100KB, to avoid + # dropping them. It is a necessity, because Stackdriver only supports + # entries that are up to 100KB in size. + + @type record_transformer + enable_ruby true + + log ${record['log'].length > 100000 ? "[Trimmed]#{record['log'][0..100000]}..." : record['log']} + + + # We use 2 output stanzas - one to handle the container logs and one to handle # the node daemon logs, the latter of which explicitly sends its logs to the # compute.googleapis.com service rather than container.googleapis.com to keep @@ -396,7 +408,7 @@ data: num_threads 2 metadata: - name: fluentd-gcp-config-v1.1.2 + name: fluentd-gcp-config-v1.2.0 namespace: kube-system labels: addonmanager.kubernetes.io/mode: Reconcile diff --git a/cluster/addons/fluentd-gcp/fluentd-gcp-ds.yaml b/cluster/addons/fluentd-gcp/fluentd-gcp-ds.yaml index d107e90a5c4..d6da3fbffb8 100644 --- a/cluster/addons/fluentd-gcp/fluentd-gcp-ds.yaml +++ b/cluster/addons/fluentd-gcp/fluentd-gcp-ds.yaml @@ -117,7 +117,7 @@ spec: path: /usr/lib64 - name: config-volume configMap: - name: fluentd-gcp-config-v1.1.2 + name: fluentd-gcp-config-v1.2.0 - name: ssl-certs hostPath: path: /etc/ssl/certs diff --git a/test/e2e/instrumentation/logging/stackdrvier/basic.go b/test/e2e/instrumentation/logging/stackdrvier/basic.go index 8e21b07c0e9..f7f0b394c17 100644 --- a/test/e2e/instrumentation/logging/stackdrvier/basic.go +++ b/test/e2e/instrumentation/logging/stackdrvier/basic.go @@ -18,6 +18,7 @@ package stackdriver import ( "fmt" + "strings" "time" "k8s.io/apimachinery/pkg/util/wait" @@ -107,6 +108,39 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) framework.ExpectNoError(err) }) + + ginkgo.By("Checking that too long lines are trimmed", func() { + originalLength := 100001 + cmd := []string{ + "/bin/sh", + "-c", + fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 1; done", originalLength), + } + trimPrefix := "[Trimmed]" + + pod, err := utils.StartAndReturnSelf(utils.NewExecLoggingPod("synthlogger-4", cmd), f) + framework.ExpectNoError(err, "Failed to start a pod") + + ginkgo.By("Waiting for logs to ingest") + c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) { + if len(logEntries) == 0 { + return false, nil + } + log := logEntries[0] + if log.JSONPayload != nil { + return false, fmt.Errorf("got json log entry %v, wanted plain text", log.JSONPayload) + } + if len(log.TextPayload) == originalLength { + return false, fmt.Errorf("got non-trimmed entry of length %d", len(log.TextPayload)) + } + if !strings.HasPrefix(log.TextPayload, trimPrefix) { + return false, fmt.Errorf("got message without prefix '%s': %s", trimPrefix, log.TextPayload) + } + return true, nil + }, utils.JustTimeout, pod.Name()) + err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout) + framework.ExpectNoError(err) + }) }) }) diff --git a/test/e2e/instrumentation/logging/utils/logging_pod.go b/test/e2e/instrumentation/logging/utils/logging_pod.go index f950c64460b..13a099f7efa 100644 --- a/test/e2e/instrumentation/logging/utils/logging_pod.go +++ b/test/e2e/instrumentation/logging/utils/logging_pod.go @@ -134,27 +134,38 @@ func (p *loadLoggingPod) ExpectedLineCount() int { return p.expectedLinesCount } -var _ LoggingPod = &repeatingLoggingPod{} - -type repeatingLoggingPod struct { - name string - line string -} - // NewRepeatingLoggingPod returns a logging pod that each second prints // line value to its stdout. func NewRepeatingLoggingPod(podName string, line string) LoggingPod { - return &repeatingLoggingPod{ + cmd := []string{ + "/bin/sh", + "-c", + fmt.Sprintf("while :; do echo '%s'; sleep 1; done", line), + } + return NewExecLoggingPod(podName, cmd) +} + +var _ LoggingPod = &execLoggingPod{} + +type execLoggingPod struct { + name string + cmd []string +} + +// NewExecLoggingPod returns a logging pod that produces logs through +// executing a command, passed in cmd. +func NewExecLoggingPod(podName string, cmd []string) LoggingPod { + return &execLoggingPod{ name: podName, - line: line, + cmd: cmd, } } -func (p *repeatingLoggingPod) Name() string { +func (p *execLoggingPod) Name() string { return p.name } -func (p *repeatingLoggingPod) Start(f *framework.Framework) error { +func (p *execLoggingPod) Start(f *framework.Framework) error { framework.Logf("Starting repeating logging pod %s", p.name) f.PodClient().Create(&api_v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ @@ -163,13 +174,9 @@ func (p *repeatingLoggingPod) Start(f *framework.Framework) error { Spec: api_v1.PodSpec{ Containers: []api_v1.Container{ { - Name: loggingContainerName, - Image: "busybox", - Command: []string{ - "/bin/sh", - "-c", - fmt.Sprintf("while :; do echo '%s'; sleep 1; done", p.line), - }, + Name: loggingContainerName, + Image: "busybox", + Command: p.cmd, Resources: api_v1.ResourceRequirements{ Requests: api_v1.ResourceList{ api_v1.ResourceCPU: *resource.NewMilliQuantity(