Merge pull request #18448 from freehan/eslogging

add validation for fluentd pods
This commit is contained in:
Minhan Xia 2015-12-15 13:25:21 -08:00
commit 2c9ffc1b45
4 changed files with 57 additions and 6 deletions

View File

@ -3,6 +3,8 @@ kind: Pod
metadata: metadata:
name: fluentd-elasticsearch name: fluentd-elasticsearch
namespace: kube-system namespace: kube-system
labels:
k8s-app: fluentd-logging
spec: spec:
containers: containers:
- name: fluentd-elasticsearch - name: fluentd-elasticsearch

View File

@ -3,6 +3,8 @@ kind: Pod
metadata: metadata:
name: fluentd-cloud-logging name: fluentd-cloud-logging
namespace: kube-system namespace: kube-system
labels:
k8s-app: fluentd-logging
spec: spec:
containers: containers:
- name: fluentd-cloud-logging - name: fluentd-cloud-logging

View File

@ -166,6 +166,8 @@ kind: Pod
metadata: metadata:
name: fluentd-cloud-logging name: fluentd-cloud-logging
namespace: kube-system namespace: kube-system
labels:
k8s-app: fluentd-logging
spec: spec:
containers: containers:
- name: fluentd-cloud-logging - name: fluentd-cloud-logging

View File

@ -46,8 +46,9 @@ var _ = Describe("Cluster level logging using Elasticsearch", func() {
}) })
const ( const (
esKey = "k8s-app" k8sAppKey = "k8s-app"
esValue = "elasticsearch-logging" esValue = "elasticsearch-logging"
fluentdValue = "fluentd-logging"
) )
func bodyToJSON(body []byte) (map[string]interface{}, error) { func bodyToJSON(body []byte) (map[string]interface{}, error) {
@ -59,6 +60,15 @@ func bodyToJSON(body []byte) (map[string]interface{}, error) {
return r, nil return r, nil
} }
func nodeInNodeList(nodeName string, nodeList *api.NodeList) bool {
for _, node := range nodeList.Items {
if nodeName == node.Name {
return true
}
}
return false
}
// ClusterLevelLoggingWithElasticsearch is an end to end test for cluster level logging. // ClusterLevelLoggingWithElasticsearch is an end to end test for cluster level logging.
func ClusterLevelLoggingWithElasticsearch(f *Framework) { func ClusterLevelLoggingWithElasticsearch(f *Framework) {
// graceTime is how long to keep retrying requests for status information. // graceTime is how long to keep retrying requests for status information.
@ -83,7 +93,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
// Wait for the Elasticsearch pods to enter the running state. // Wait for the Elasticsearch pods to enter the running state.
By("Checking to make sure the Elasticsearch pods are running") By("Checking to make sure the Elasticsearch pods are running")
label := labels.SelectorFromSet(labels.Set(map[string]string{esKey: esValue})) label := labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: esValue}))
options := api.ListOptions{LabelSelector: label} options := api.ListOptions{LabelSelector: label}
pods, err := f.Client.Pods(api.NamespaceSystem).List(options) pods, err := f.Client.Pods(api.NamespaceSystem).List(options)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -152,13 +162,14 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
Resource("services"). Resource("services").
Name("elasticsearch-logging"). Name("elasticsearch-logging").
Suffix("_cluster/health"). Suffix("_cluster/health").
Param("health", "pretty"). Param("level", "indices").
DoRaw() DoRaw()
if err != nil { if err != nil {
continue continue
} }
health, err := bodyToJSON(body) health, err := bodyToJSON(body)
if err != nil { if err != nil {
Logf("Bad json response from elasticsearch: %v", err)
continue continue
} }
statusIntf, ok := health["status"] statusIntf, ok := health["status"]
@ -168,7 +179,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
} }
status := statusIntf.(string) status := statusIntf.(string)
if status != "green" && status != "yellow" { if status != "green" && status != "yellow" {
Logf("Cluster health has bad status: %s", status) Logf("Cluster health has bad status: %v", health)
continue continue
} }
if err == nil && ok { if err == nil && ok {
@ -202,6 +213,33 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
} }
Logf("Found %d healthy nodes.", len(nodes.Items)) Logf("Found %d healthy nodes.", len(nodes.Items))
// Wait for the Fluentd pods to enter the running state.
By("Checking to make sure the Fluentd pod are running on each healthy node")
label = labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: fluentdValue}))
options = api.ListOptions{LabelSelector: label}
pods, err = f.Client.Pods(api.NamespaceSystem).List(options)
Expect(err).NotTo(HaveOccurred())
for _, pod := range pods.Items {
if nodeInNodeList(pod.Spec.NodeName, nodes) {
err = waitForPodRunningInNamespace(f.Client, pod.Name, api.NamespaceSystem)
Expect(err).NotTo(HaveOccurred())
}
}
// Check if each healthy node has fluentd running on it
for _, node := range nodes.Items {
exists := false
for _, pod := range pods.Items {
if pod.Spec.NodeName == node.Name {
exists = true
break
}
}
if !exists {
Failf("Node %v does not have fluentd pod running on it.", node.Name)
}
}
// Create a unique root name for the resources in this test to permit // Create a unique root name for the resources in this test to permit
// parallel executions of this test. // parallel executions of this test.
// Use a unique namespace for the resources created in this test. // Use a unique namespace for the resources created in this test.
@ -268,7 +306,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(10 * time.Second) { for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(10 * time.Second) {
// Debugging code to report the status of the elasticsearch logging endpoints. // Debugging code to report the status of the elasticsearch logging endpoints.
selector := labels.Set{esKey: esValue}.AsSelector() selector := labels.Set{k8sAppKey: esValue}.AsSelector()
options := api.ListOptions{LabelSelector: selector} options := api.ListOptions{LabelSelector: selector}
esPods, err := f.Client.Pods(api.NamespaceSystem).List(options) esPods, err := f.Client.Pods(api.NamespaceSystem).List(options)
if err != nil { if err != nil {
@ -386,6 +424,13 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
for n := range missingPerNode { for n := range missingPerNode {
if missingPerNode[n] > 0 { if missingPerNode[n] > 0 {
Logf("Node %d is missing %d logs", n, missingPerNode[n]) Logf("Node %d is missing %d logs", n, missingPerNode[n])
opts := &api.PodLogOptions{}
body, err = f.Client.Pods(ns).GetLogs(podNames[n], opts).DoRaw()
if err != nil {
Logf("Cannot get logs from pod %v", podNames[n])
continue
}
Logf("Pod %s has the following logs: %s", podNames[n], body)
} }
} }
Failf("Failed to find all %d log lines", expected) Failf("Failed to find all %d log lines", expected)