diff --git a/cluster/saltbase/salt/fluentd-es/fluentd-es.yaml b/cluster/saltbase/salt/fluentd-es/fluentd-es.yaml index 24219449472..634015bf65c 100644 --- a/cluster/saltbase/salt/fluentd-es/fluentd-es.yaml +++ b/cluster/saltbase/salt/fluentd-es/fluentd-es.yaml @@ -3,6 +3,8 @@ kind: Pod metadata: name: fluentd-elasticsearch namespace: kube-system + labels: + k8s-app: fluentd-logging spec: containers: - name: fluentd-elasticsearch diff --git a/cluster/saltbase/salt/fluentd-gcp/fluentd-gcp.yaml b/cluster/saltbase/salt/fluentd-gcp/fluentd-gcp.yaml index dcc3322f24f..90a7d86a019 100644 --- a/cluster/saltbase/salt/fluentd-gcp/fluentd-gcp.yaml +++ b/cluster/saltbase/salt/fluentd-gcp/fluentd-gcp.yaml @@ -3,6 +3,8 @@ kind: Pod metadata: name: fluentd-cloud-logging namespace: kube-system + labels: + k8s-app: fluentd-logging spec: containers: - name: fluentd-cloud-logging diff --git a/docs/getting-started-guides/logging.md b/docs/getting-started-guides/logging.md index 2a195c0747e..5f53fa8599c 100644 --- a/docs/getting-started-guides/logging.md +++ b/docs/getting-started-guides/logging.md @@ -166,6 +166,8 @@ kind: Pod metadata: name: fluentd-cloud-logging namespace: kube-system + labels: + k8s-app: fluentd-logging spec: containers: - name: fluentd-cloud-logging diff --git a/test/e2e/es_cluster_logging.go b/test/e2e/es_cluster_logging.go index 6af93699e40..21b359d2ae9 100644 --- a/test/e2e/es_cluster_logging.go +++ b/test/e2e/es_cluster_logging.go @@ -46,8 +46,9 @@ var _ = Describe("Cluster level logging using Elasticsearch", func() { }) const ( - esKey = "k8s-app" - esValue = "elasticsearch-logging" + k8sAppKey = "k8s-app" + esValue = "elasticsearch-logging" + fluentdValue = "fluentd-logging" ) func bodyToJSON(body []byte) (map[string]interface{}, error) { @@ -59,6 +60,15 @@ func bodyToJSON(body []byte) (map[string]interface{}, error) { return r, nil } +func nodeInNodeList(nodeName string, nodeList *api.NodeList) bool { + for _, node := range nodeList.Items { + if nodeName == node.Name { + return true + } + } + return false +} + // ClusterLevelLoggingWithElasticsearch is an end to end test for cluster level logging. func ClusterLevelLoggingWithElasticsearch(f *Framework) { // graceTime is how long to keep retrying requests for status information. @@ -83,7 +93,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { // Wait for the Elasticsearch pods to enter the running state. By("Checking to make sure the Elasticsearch pods are running") - label := labels.SelectorFromSet(labels.Set(map[string]string{esKey: esValue})) + label := labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: esValue})) options := api.ListOptions{LabelSelector: label} pods, err := f.Client.Pods(api.NamespaceSystem).List(options) Expect(err).NotTo(HaveOccurred()) @@ -152,13 +162,14 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { Resource("services"). Name("elasticsearch-logging"). Suffix("_cluster/health"). - Param("health", "pretty"). + Param("level", "indices"). DoRaw() if err != nil { continue } health, err := bodyToJSON(body) if err != nil { + Logf("Bad json response from elasticsearch: %v", err) continue } statusIntf, ok := health["status"] @@ -168,7 +179,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { } status := statusIntf.(string) if status != "green" && status != "yellow" { - Logf("Cluster health has bad status: %s", status) + Logf("Cluster health has bad status: %v", health) continue } if err == nil && ok { @@ -202,6 +213,33 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { } Logf("Found %d healthy nodes.", len(nodes.Items)) + // Wait for the Fluentd pods to enter the running state. + By("Checking to make sure the Fluentd pod are running on each healthy node") + label = labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: fluentdValue})) + options = api.ListOptions{LabelSelector: label} + pods, err = f.Client.Pods(api.NamespaceSystem).List(options) + Expect(err).NotTo(HaveOccurred()) + for _, pod := range pods.Items { + if nodeInNodeList(pod.Spec.NodeName, nodes) { + err = waitForPodRunningInNamespace(f.Client, pod.Name, api.NamespaceSystem) + Expect(err).NotTo(HaveOccurred()) + } + } + + // Check if each healthy node has fluentd running on it + for _, node := range nodes.Items { + exists := false + for _, pod := range pods.Items { + if pod.Spec.NodeName == node.Name { + exists = true + break + } + } + if !exists { + Failf("Node %v does not have fluentd pod running on it.", node.Name) + } + } + // Create a unique root name for the resources in this test to permit // parallel executions of this test. // Use a unique namespace for the resources created in this test. @@ -268,7 +306,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(10 * time.Second) { // Debugging code to report the status of the elasticsearch logging endpoints. - selector := labels.Set{esKey: esValue}.AsSelector() + selector := labels.Set{k8sAppKey: esValue}.AsSelector() options := api.ListOptions{LabelSelector: selector} esPods, err := f.Client.Pods(api.NamespaceSystem).List(options) if err != nil { @@ -386,6 +424,13 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) { for n := range missingPerNode { if missingPerNode[n] > 0 { Logf("Node %d is missing %d logs", n, missingPerNode[n]) + opts := &api.PodLogOptions{} + body, err = f.Client.Pods(ns).GetLogs(podNames[n], opts).DoRaw() + if err != nil { + Logf("Cannot get logs from pod %v", podNames[n]) + continue + } + Logf("Pod %s has the following logs: %s", podNames[n], body) } } Failf("Failed to find all %d log lines", expected)