[fluentd-gcp addon] Trim too long log entries due to Stackdriver limitation

This commit is contained in:
Mik Vyatskov 2017-09-11 18:51:46 +02:00
parent b05d8ad1ec
commit d8525f8bd1
4 changed files with 73 additions and 20 deletions

View File

@ -345,6 +345,18 @@ data:
</metric>
</filter>
# TODO(instrumentation): Reconsider this workaround later.
# Trim the entries which exceed slightly less than 100KB, to avoid
# dropping them. It is a necessity, because Stackdriver only supports
# entries that are up to 100KB in size.
<filter kubernetes.**>
@type record_transformer
enable_ruby true
<record>
log ${record['log'].length > 100000 ? "[Trimmed]#{record['log'][0..100000]}..." : record['log']}
</record>
</filter>
# We use 2 output stanzas - one to handle the container logs and one to handle
# the node daemon logs, the latter of which explicitly sends its logs to the
# compute.googleapis.com service rather than container.googleapis.com to keep
@ -396,7 +408,7 @@ data:
num_threads 2
</match>
metadata:
name: fluentd-gcp-config-v1.1.2
name: fluentd-gcp-config-v1.2.0
namespace: kube-system
labels:
addonmanager.kubernetes.io/mode: Reconcile

View File

@ -117,7 +117,7 @@ spec:
path: /usr/lib64
- name: config-volume
configMap:
name: fluentd-gcp-config-v1.1.2
name: fluentd-gcp-config-v1.2.0
- name: ssl-certs
hostPath:
path: /etc/ssl/certs

View File

@ -18,6 +18,7 @@ package stackdriver
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/wait"
@ -107,6 +108,39 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
framework.ExpectNoError(err)
})
ginkgo.By("Checking that too long lines are trimmed", func() {
originalLength := 100001
cmd := []string{
"/bin/sh",
"-c",
fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 1; done", originalLength),
}
trimPrefix := "[Trimmed]"
pod, err := utils.StartAndReturnSelf(utils.NewExecLoggingPod("synthlogger-4", cmd), f)
framework.ExpectNoError(err, "Failed to start a pod")
ginkgo.By("Waiting for logs to ingest")
c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) {
if len(logEntries) == 0 {
return false, nil
}
log := logEntries[0]
if log.JSONPayload != nil {
return false, fmt.Errorf("got json log entry %v, wanted plain text", log.JSONPayload)
}
if len(log.TextPayload) == originalLength {
return false, fmt.Errorf("got non-trimmed entry of length %d", len(log.TextPayload))
}
if !strings.HasPrefix(log.TextPayload, trimPrefix) {
return false, fmt.Errorf("got message without prefix '%s': %s", trimPrefix, log.TextPayload)
}
return true, nil
}, utils.JustTimeout, pod.Name())
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
framework.ExpectNoError(err)
})
})
})

View File

@ -134,27 +134,38 @@ func (p *loadLoggingPod) ExpectedLineCount() int {
return p.expectedLinesCount
}
var _ LoggingPod = &repeatingLoggingPod{}
type repeatingLoggingPod struct {
name string
line string
}
// NewRepeatingLoggingPod returns a logging pod that each second prints
// line value to its stdout.
func NewRepeatingLoggingPod(podName string, line string) LoggingPod {
return &repeatingLoggingPod{
cmd := []string{
"/bin/sh",
"-c",
fmt.Sprintf("while :; do echo '%s'; sleep 1; done", line),
}
return NewExecLoggingPod(podName, cmd)
}
var _ LoggingPod = &execLoggingPod{}
type execLoggingPod struct {
name string
cmd []string
}
// NewExecLoggingPod returns a logging pod that produces logs through
// executing a command, passed in cmd.
func NewExecLoggingPod(podName string, cmd []string) LoggingPod {
return &execLoggingPod{
name: podName,
line: line,
cmd: cmd,
}
}
func (p *repeatingLoggingPod) Name() string {
func (p *execLoggingPod) Name() string {
return p.name
}
func (p *repeatingLoggingPod) Start(f *framework.Framework) error {
func (p *execLoggingPod) Start(f *framework.Framework) error {
framework.Logf("Starting repeating logging pod %s", p.name)
f.PodClient().Create(&api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
@ -163,13 +174,9 @@ func (p *repeatingLoggingPod) Start(f *framework.Framework) error {
Spec: api_v1.PodSpec{
Containers: []api_v1.Container{
{
Name: loggingContainerName,
Image: "busybox",
Command: []string{
"/bin/sh",
"-c",
fmt.Sprintf("while :; do echo '%s'; sleep 1; done", p.line),
},
Name: loggingContainerName,
Image: "busybox",
Command: p.cmd,
Resources: api_v1.ResourceRequirements{
Requests: api_v1.ResourceList{
api_v1.ResourceCPU: *resource.NewMilliQuantity(