diff --git a/test/e2e/BUILD b/test/e2e/BUILD index fa5dae29480..c54aef0c508 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -208,6 +208,7 @@ go_library( "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/transport", "//vendor:k8s.io/client-go/util/flowcontrol", + "//vendor:k8s.io/client-go/util/integer", "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/test/e2e/cluster_logging_es.go b/test/e2e/cluster_logging_es.go index d9199454a11..e971bddf4de 100644 --- a/test/e2e/cluster_logging_es.go +++ b/test/e2e/cluster_logging_es.go @@ -51,11 +51,14 @@ var _ = framework.KubeDescribe("Cluster level logging using Elasticsearch [Featu framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName)) By("Waiting for logs to ingest") - err = waitForLogsIngestion(esLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0) - framework.ExpectNoError(err, "Failed to ingest logs") - - if err != nil { - reportLogsFromFluentdPod(f, pod) + config := &loggingTestConfig{ + LogsProvider: esLogsProvider, + Pods: []*loggingPod{pod}, + IngestionTimeout: 10 * time.Minute, + MaxAllowedLostFraction: 0, + MaxAllowedFluentdRestarts: 0, } + err = waitForLogsIngestion(f, config) + framework.ExpectNoError(err, "Failed to ingest logs") }) }) diff --git a/test/e2e/cluster_logging_es_utils.go b/test/e2e/cluster_logging_es_utils.go index e9704a6038e..c83ad8ada90 100644 --- a/test/e2e/cluster_logging_es_utils.go +++ b/test/e2e/cluster_logging_es_utils.go @@ -46,6 +46,10 @@ func newEsLogsProvider(f *framework.Framework) (*esLogsProvider, error) { return &esLogsProvider{Framework: f}, nil } +func (logsProvider *esLogsProvider) FluentdApplicationName() string { + return "fluentd-es" +} + // Ensures that elasticsearch is running and ready to serve requests func (logsProvider *esLogsProvider) EnsureWorking() error { f := logsProvider.Framework diff --git a/test/e2e/cluster_logging_gcl.go b/test/e2e/cluster_logging_gcl.go index afb97207698..97a1584e13a 100644 --- a/test/e2e/cluster_logging_gcl.go +++ b/test/e2e/cluster_logging_gcl.go @@ -50,11 +50,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Flaky]", func() framework.ExpectNoError(err, fmt.Sprintf("Should've successfully waited for pod %s to succeed", podName)) By("Waiting for logs to ingest") - err = waitForLogsIngestion(gclLogsProvider, []*loggingPod{pod}, 10*time.Minute, 0) - framework.ExpectNoError(err, "Failed to ingest logs") - - if err != nil { - reportLogsFromFluentdPod(f, pod) + config := &loggingTestConfig{ + LogsProvider: gclLogsProvider, + Pods: []*loggingPod{pod}, + IngestionTimeout: 10 * time.Minute, + MaxAllowedLostFraction: 0, + MaxAllowedFluentdRestarts: 0, } + err = waitForLogsIngestion(f, config) + framework.ExpectNoError(err, "Failed to ingest logs") }) }) diff --git a/test/e2e/cluster_logging_gcl_load.go b/test/e2e/cluster_logging_gcl_load.go index 440cf90c807..bae6c5b1ce0 100644 --- a/test/e2e/cluster_logging_gcl_load.go +++ b/test/e2e/cluster_logging_gcl_load.go @@ -28,7 +28,8 @@ import ( const ( // TODO(crassirostris): Once test is stable, decrease allowed loses - loadTestMaxAllowedLostFraction = 0.1 + loadTestMaxAllowedLostFraction = 0.1 + loadTestMaxAllowedFluentdRestarts = 1 ) // TODO(crassirostris): Remove Flaky once test is stable @@ -58,7 +59,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]", time.Sleep(loggingDuration) By("Waiting for all log lines to be ingested") - err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction) + config := &loggingTestConfig{ + LogsProvider: gclLogsProvider, + Pods: pods, + IngestionTimeout: ingestionTimeout, + MaxAllowedLostFraction: loadTestMaxAllowedLostFraction, + MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts, + } + err = waitForLogsIngestion(f, config) if err != nil { framework.Failf("Failed to ingest logs: %v", err) } else { @@ -96,7 +104,14 @@ var _ = framework.KubeDescribe("Cluster level logging using GCL [Slow] [Flaky]", time.Sleep(jobDuration) By("Waiting for all log lines to be ingested") - err = waitForLogsIngestion(gclLogsProvider, pods, ingestionTimeout, loadTestMaxAllowedLostFraction) + config := &loggingTestConfig{ + LogsProvider: gclLogsProvider, + Pods: pods, + IngestionTimeout: ingestionTimeout, + MaxAllowedLostFraction: loadTestMaxAllowedLostFraction, + MaxAllowedFluentdRestarts: loadTestMaxAllowedFluentdRestarts, + } + err = waitForLogsIngestion(f, config) if err != nil { framework.Failf("Failed to ingest logs: %v", err) } else { diff --git a/test/e2e/cluster_logging_gcl_utils.go b/test/e2e/cluster_logging_gcl_utils.go index c372691634a..8efa5a8191f 100644 --- a/test/e2e/cluster_logging_gcl_utils.go +++ b/test/e2e/cluster_logging_gcl_utils.go @@ -64,6 +64,10 @@ func newGclLogsProvider(f *framework.Framework) (*gclLogsProvider, error) { return provider, nil } +func (logsProvider *gclLogsProvider) FluentdApplicationName() string { + return "fluentd-gcp" +} + // Since GCL API is not easily available from the outside of cluster // we use gcloud command to perform search with filter func (gclLogsProvider *gclLogsProvider) ReadEntries(pod *loggingPod) []*logEntry { diff --git a/test/e2e/cluster_logging_utils.go b/test/e2e/cluster_logging_utils.go index 36ac557a1bf..a9b53652481 100644 --- a/test/e2e/cluster_logging_utils.go +++ b/test/e2e/cluster_logging_utils.go @@ -17,7 +17,6 @@ limitations under the License. package e2e import ( - "errors" "fmt" "strconv" "strings" @@ -26,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/util/integer" "k8s.io/kubernetes/pkg/api" api_v1 "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/test/e2e/framework" @@ -63,10 +63,19 @@ type logEntry struct { } type logsProvider interface { + FluentdApplicationName() string EnsureWorking() error ReadEntries(*loggingPod) []*logEntry } +type loggingTestConfig struct { + LogsProvider logsProvider + Pods []*loggingPod + IngestionTimeout time.Duration + MaxAllowedLostFraction float64 + MaxAllowedFluentdRestarts int +} + func (entry *logEntry) getLogEntryNumber() (int, bool) { chunks := strings.Split(entry.Payload, " ") lineNumber, err := strconv.Atoi(strings.TrimSpace(chunks[0])) @@ -123,27 +132,27 @@ func createLogsGeneratorPod(f *framework.Framework, podName string, linesCount i }) } -func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingestionTimeout time.Duration, maxAllowedLostFraction float64) error { +func waitForLogsIngestion(f *framework.Framework, config *loggingTestConfig) error { expectedLinesNumber := 0 - for _, pod := range pods { + for _, pod := range config.Pods { expectedLinesNumber += pod.ExpectedLinesNumber } totalMissing := expectedLinesNumber - missingByPod := make([]int, len(pods)) - for podIdx, pod := range pods { + missingByPod := make([]int, len(config.Pods)) + for podIdx, pod := range config.Pods { missingByPod[podIdx] = pod.ExpectedLinesNumber } - for start := time.Now(); totalMissing > 0 && time.Since(start) < ingestionTimeout; time.Sleep(ingestionRetryDelay) { + for start := time.Now(); totalMissing > 0 && time.Since(start) < config.IngestionTimeout; time.Sleep(ingestionRetryDelay) { missing := 0 - for podIdx, pod := range pods { + for podIdx, pod := range config.Pods { if missingByPod[podIdx] == 0 { continue } - missingByPod[podIdx] = pullMissingLogsCount(logsProvider, pod) + missingByPod[podIdx] = pullMissingLogsCount(config.LogsProvider, pod) missing += missingByPod[podIdx] } @@ -156,13 +165,32 @@ func waitForLogsIngestion(logsProvider logsProvider, pods []*loggingPod, ingesti lostFraction := float64(totalMissing) / float64(expectedLinesNumber) if totalMissing > 0 { - framework.Logf("After %v still missing %d lines, %.2f%% of total number oflines", - ingestionTimeout, totalMissing, lostFraction*100) + framework.Logf("After %v still missing %d lines, %.2f%% of total number of lines", + config.IngestionTimeout, totalMissing, lostFraction*100) } - if lostFraction > maxAllowedLostFraction { + if lostFraction > config.MaxAllowedLostFraction { return fmt.Errorf("lost %.2f%% of lines, but only loss of %.2f%% can be tolerated", - lostFraction*100, maxAllowedLostFraction*100) + lostFraction*100, config.MaxAllowedLostFraction*100) + } + + fluentdPods, err := getFluentdPods(f, config.LogsProvider.FluentdApplicationName()) + if err != nil { + return fmt.Errorf("failed to get fluentd pods due to %v", err) + } + + maxRestartCount := 0 + for _, fluentdPod := range fluentdPods.Items { + restartCount := int(fluentdPod.Status.ContainerStatuses[0].RestartCount) + maxRestartCount = integer.IntMax(maxRestartCount, restartCount) + + framework.Logf("Fluentd pod %s on node %s was restarted %d times", + fluentdPod.Name, fluentdPod.Spec.NodeName, restartCount) + } + + if maxRestartCount > config.MaxAllowedFluentdRestarts { + return fmt.Errorf("max fluentd pod restarts was %d, which is more than allowed %d", + maxRestartCount, config.MaxAllowedFluentdRestarts) } return nil @@ -211,32 +239,8 @@ func getMissingLinesCount(logsProvider logsProvider, pod *loggingPod) (int, erro return pod.ExpectedLinesNumber - len(pod.Occurrences), nil } -func reportLogsFromFluentdPod(f *framework.Framework, pod *loggingPod) error { - synthLoggerPod, err := f.PodClient().Get(pod.Name, meta_v1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get synth logger pod due to %v", err) - } - - synthLoggerNodeName := synthLoggerPod.Spec.NodeName - if synthLoggerNodeName == "" { - return errors.New("Synthlogger pod is not assigned to the node") - } - - label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "fluentd-logging"})) +func getFluentdPods(f *framework.Framework, fluentdApplicationName string) (*api_v1.PodList, error) { + label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": fluentdApplicationName})) options := meta_v1.ListOptions{LabelSelector: label.String()} - fluentdPods, err := f.ClientSet.Core().Pods(api.NamespaceSystem).List(options) - - for _, fluentdPod := range fluentdPods.Items { - if fluentdPod.Spec.NodeName == synthLoggerNodeName { - containerName := fluentdPod.Spec.Containers[0].Name - logs, err := framework.GetPodLogs(f.ClientSet, meta_v1.NamespaceSystem, fluentdPod.Name, containerName) - if err != nil { - return fmt.Errorf("failed to get logs from fluentd pod %s due to %v", fluentdPod.Name, err) - } - framework.Logf("Logs from fluentd pod %s:\n%s", fluentdPod.Name, logs) - return nil - } - } - - return fmt.Errorf("failed to find fluentd pod running on node %s", synthLoggerNodeName) + return f.ClientSet.Core().Pods(api.NamespaceSystem).List(options) }