Merge pull request #52289 from crassirostris/sd-logging-trim-long-lines

Automatic merge from submit-queue (batch tested with PRs 52316, 52289, 52375)

[fluentd-gcp addon] Trim too long log entries due to Stackdriver limitations

Stackdriver doesn't support log entries bigger than 100KB, so by default fluentd plugin just drops such entries. To avoid that and increase the visibility of this problem it's suggested to trim long lines instead.

/cc @igorpeshansky

```release-note
[fluentd-gcp addon] Fluentd will trim lines exceeding 100KB instead of dropping them.
```
This commit is contained in:
Kubernetes Submit Queue
2017-09-13 04:04:52 -07:00
committed by GitHub
4 changed files with 73 additions and 20 deletions

View File

@@ -345,6 +345,18 @@ data:
</metric>
</filter>
# TODO(instrumentation): Reconsider this workaround later.
# Trim the entries which exceed slightly less than 100KB, to avoid
# dropping them. It is a necessity, because Stackdriver only supports
# entries that are up to 100KB in size.
<filter kubernetes.**>
@type record_transformer
enable_ruby true
<record>
log ${record['log'].length > 100000 ? "[Trimmed]#{record['log'][0..100000]}..." : record['log']}
</record>
</filter>
# We use 2 output stanzas - one to handle the container logs and one to handle
# the node daemon logs, the latter of which explicitly sends its logs to the
# compute.googleapis.com service rather than container.googleapis.com to keep
@@ -396,7 +408,7 @@ data:
num_threads 2
</match>
metadata:
name: fluentd-gcp-config-v1.1.2
name: fluentd-gcp-config-v1.2.0
namespace: kube-system
labels:
addonmanager.kubernetes.io/mode: Reconcile

View File

@@ -117,7 +117,7 @@ spec:
path: /usr/lib64
- name: config-volume
configMap:
name: fluentd-gcp-config-v1.1.2
name: fluentd-gcp-config-v1.2.0
- name: ssl-certs
hostPath:
path: /etc/ssl/certs

View File

@@ -18,6 +18,7 @@ package stackdriver
import (
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/wait"
@@ -107,6 +108,39 @@ var _ = instrumentation.SIGDescribe("Cluster level logging implemented by Stackd
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
framework.ExpectNoError(err)
})
ginkgo.By("Checking that too long lines are trimmed", func() {
originalLength := 100001
cmd := []string{
"/bin/sh",
"-c",
fmt.Sprintf("while :; do printf '%%*s' %d | tr ' ' 'A'; echo; sleep 1; done", originalLength),
}
trimPrefix := "[Trimmed]"
pod, err := utils.StartAndReturnSelf(utils.NewExecLoggingPod("synthlogger-4", cmd), f)
framework.ExpectNoError(err, "Failed to start a pod")
ginkgo.By("Waiting for logs to ingest")
c := utils.NewLogChecker(p, func(_ string, logEntries []utils.LogEntry) (bool, error) {
if len(logEntries) == 0 {
return false, nil
}
log := logEntries[0]
if log.JSONPayload != nil {
return false, fmt.Errorf("got json log entry %v, wanted plain text", log.JSONPayload)
}
if len(log.TextPayload) == originalLength {
return false, fmt.Errorf("got non-trimmed entry of length %d", len(log.TextPayload))
}
if !strings.HasPrefix(log.TextPayload, trimPrefix) {
return false, fmt.Errorf("got message without prefix '%s': %s", trimPrefix, log.TextPayload)
}
return true, nil
}, utils.JustTimeout, pod.Name())
err = utils.WaitForLogs(c, ingestionInterval, ingestionTimeout)
framework.ExpectNoError(err)
})
})
})

View File

@@ -134,27 +134,38 @@ func (p *loadLoggingPod) ExpectedLineCount() int {
return p.expectedLinesCount
}
var _ LoggingPod = &repeatingLoggingPod{}
type repeatingLoggingPod struct {
name string
line string
}
// NewRepeatingLoggingPod returns a logging pod that each second prints
// line value to its stdout.
func NewRepeatingLoggingPod(podName string, line string) LoggingPod {
return &repeatingLoggingPod{
cmd := []string{
"/bin/sh",
"-c",
fmt.Sprintf("while :; do echo '%s'; sleep 1; done", line),
}
return NewExecLoggingPod(podName, cmd)
}
var _ LoggingPod = &execLoggingPod{}
type execLoggingPod struct {
name string
cmd []string
}
// NewExecLoggingPod returns a logging pod that produces logs through
// executing a command, passed in cmd.
func NewExecLoggingPod(podName string, cmd []string) LoggingPod {
return &execLoggingPod{
name: podName,
line: line,
cmd: cmd,
}
}
func (p *repeatingLoggingPod) Name() string {
func (p *execLoggingPod) Name() string {
return p.name
}
func (p *repeatingLoggingPod) Start(f *framework.Framework) error {
func (p *execLoggingPod) Start(f *framework.Framework) error {
framework.Logf("Starting repeating logging pod %s", p.name)
f.PodClient().Create(&api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
@@ -163,13 +174,9 @@ func (p *repeatingLoggingPod) Start(f *framework.Framework) error {
Spec: api_v1.PodSpec{
Containers: []api_v1.Container{
{
Name: loggingContainerName,
Image: "busybox",
Command: []string{
"/bin/sh",
"-c",
fmt.Sprintf("while :; do echo '%s'; sleep 1; done", p.line),
},
Name: loggingContainerName,
Image: "busybox",
Command: p.cmd,
Resources: api_v1.ResourceRequirements{
Requests: api_v1.ResourceList{
api_v1.ResourceCPU: *resource.NewMilliQuantity(