Merge pull request #19383 from freehan/esloggingfix

Improve ES test resilience
This commit is contained in:
Minhan Xia 2016-01-11 10:08:50 -08:00
commit 2a7656c0c0
2 changed files with 44 additions and 19 deletions

View File

@ -12,8 +12,6 @@ spec:
resources: resources:
limits: limits:
cpu: 100m cpu: 100m
args:
- -q
volumeMounts: volumeMounts:
- name: varlog - name: varlog
mountPath: /var/log mountPath: /var/log

View File

@ -76,7 +76,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
const graceTime = 5 * time.Minute const graceTime = 5 * time.Minute
// ingestionTimeout is how long to keep retrying to wait for all the // ingestionTimeout is how long to keep retrying to wait for all the
// logs to be ingested. // logs to be ingested.
const ingestionTimeout = 3 * time.Minute const ingestionTimeout = 10 * time.Minute
// Check for the existence of the Elasticsearch service. // Check for the existence of the Elasticsearch service.
By("Checking the Elasticsearch service exists.") By("Checking the Elasticsearch service exists.")
@ -219,9 +219,9 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
By("Checking to make sure the Fluentd pod are running on each healthy node") By("Checking to make sure the Fluentd pod are running on each healthy node")
label = labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: fluentdValue})) label = labels.SelectorFromSet(labels.Set(map[string]string{k8sAppKey: fluentdValue}))
options = api.ListOptions{LabelSelector: label} options = api.ListOptions{LabelSelector: label}
pods, err = f.Client.Pods(api.NamespaceSystem).List(options) fluentdPods, err := f.Client.Pods(api.NamespaceSystem).List(options)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
for _, pod := range pods.Items { for _, pod := range fluentdPods.Items {
if nodeInNodeList(pod.Spec.NodeName, nodes) { if nodeInNodeList(pod.Spec.NodeName, nodes) {
err = waitForPodRunningInNamespace(f.Client, pod.Name, api.NamespaceSystem) err = waitForPodRunningInNamespace(f.Client, pod.Name, api.NamespaceSystem)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -231,7 +231,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
// Check if each healthy node has fluentd running on it // Check if each healthy node has fluentd running on it
for _, node := range nodes.Items { for _, node := range nodes.Items {
exists := false exists := false
for _, pod := range pods.Items { for _, pod := range fluentdPods.Items {
if pod.Spec.NodeName == node.Name { if pod.Spec.NodeName == node.Name {
exists = true exists = true
break break
@ -305,7 +305,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
totalMissing := 0 totalMissing := 0
expected := nodeCount * countTo expected := nodeCount * countTo
missingPerNode := []int{} missingPerNode := []int{}
for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(10 * time.Second) { for start := time.Now(); time.Since(start) < ingestionTimeout; time.Sleep(25 * time.Second) {
// Debugging code to report the status of the elasticsearch logging endpoints. // Debugging code to report the status of the elasticsearch logging endpoints.
selector := labels.Set{k8sAppKey: esValue}.AsSelector() selector := labels.Set{k8sAppKey: esValue}.AsSelector()
@ -346,7 +346,8 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
} }
hits, ok := response["hits"].(map[string]interface{}) hits, ok := response["hits"].(map[string]interface{})
if !ok { if !ok {
Failf("response[hits] not of the expected type: %T", response["hits"]) Logf("response[hits] not of the expected type: %T", response["hits"])
continue
} }
totalF, ok := hits["total"].(float64) totalF, ok := hits["total"].(float64)
if !ok { if !ok {
@ -371,33 +372,41 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
for _, e := range h { for _, e := range h {
l, ok := e.(map[string]interface{}) l, ok := e.(map[string]interface{})
if !ok { if !ok {
Failf("element of hit not of expected type: %T", e) Logf("element of hit not of expected type: %T", e)
continue
} }
source, ok := l["_source"].(map[string]interface{}) source, ok := l["_source"].(map[string]interface{})
if !ok { if !ok {
Failf("_source not of the expected type: %T", l["_source"]) Logf("_source not of the expected type: %T", l["_source"])
continue
} }
msg, ok := source["log"].(string) msg, ok := source["log"].(string)
if !ok { if !ok {
Failf("log not of the expected type: %T", source["log"]) Logf("log not of the expected type: %T", source["log"])
continue
} }
words := strings.Split(msg, " ") words := strings.Split(msg, " ")
if len(words) < 4 { if len(words) != 4 {
Failf("Malformed log line: %s", msg) Logf("Malformed log line: %s", msg)
continue
} }
n, err := strconv.ParseUint(words[0], 10, 0) n, err := strconv.ParseUint(words[0], 10, 0)
if err != nil { if err != nil {
Failf("Expecting numer of node as first field of %s", msg) Logf("Expecting numer of node as first field of %s", msg)
continue
} }
if n < 0 || int(n) >= nodeCount { if n < 0 || int(n) >= nodeCount {
Failf("Node count index out of range: %d", nodeCount) Logf("Node count index out of range: %d", nodeCount)
continue
} }
index, err := strconv.ParseUint(words[2], 10, 0) index, err := strconv.ParseUint(words[2], 10, 0)
if err != nil { if err != nil {
Failf("Expecting number as third field of %s", msg) Logf("Expecting number as third field of %s", msg)
continue
} }
if index < 0 || index >= countTo { if index < 0 || index >= countTo {
Failf("Index value out of range: %d", index) Logf("Index value out of range: %d", index)
continue
} }
// Record the observation of a log line from node n at the given index. // Record the observation of a log line from node n at the given index.
observed[n][index]++ observed[n][index]++
@ -405,6 +414,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
// Make sure we correctly observed the expected log lines from each node. // Make sure we correctly observed the expected log lines from each node.
totalMissing = 0 totalMissing = 0
missingPerNode = make([]int, nodeCount) missingPerNode = make([]int, nodeCount)
incorrectCount := false
for n := range observed { for n := range observed {
for i, c := range observed[n] { for i, c := range observed[n] {
if c == 0 { if c == 0 {
@ -412,10 +422,15 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
missingPerNode[n]++ missingPerNode[n]++
} }
if c < 0 || c > 1 { if c < 0 || c > 1 {
Failf("Got incorrect count for node %d index %d: %d", n, i, c) Logf("Got incorrect count for node %d index %d: %d", n, i, c)
incorrectCount = true
} }
} }
} }
if incorrectCount {
Logf("After %v es still return duplicated log lines", time.Since(start))
continue
}
if totalMissing != 0 { if totalMissing != 0 {
Logf("After %v still missing %d log lines", time.Since(start), totalMissing) Logf("After %v still missing %d log lines", time.Since(start), totalMissing)
continue continue
@ -425,7 +440,7 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
} }
for n := range missingPerNode { for n := range missingPerNode {
if missingPerNode[n] > 0 { if missingPerNode[n] > 0 {
Logf("Node %d is missing %d logs", n, missingPerNode[n]) Logf("Node %d %s is missing %d logs", n, nodes.Items[n].Name, missingPerNode[n])
opts := &api.PodLogOptions{} opts := &api.PodLogOptions{}
body, err = f.Client.Pods(ns).GetLogs(podNames[n], opts).DoRaw() body, err = f.Client.Pods(ns).GetLogs(podNames[n], opts).DoRaw()
if err != nil { if err != nil {
@ -433,6 +448,18 @@ func ClusterLevelLoggingWithElasticsearch(f *Framework) {
continue continue
} }
Logf("Pod %s has the following logs: %s", podNames[n], body) Logf("Pod %s has the following logs: %s", podNames[n], body)
for _, pod := range fluentdPods.Items {
if pod.Spec.NodeName == nodes.Items[n].Name {
body, err = f.Client.Pods(api.NamespaceSystem).GetLogs(pod.Name, opts).DoRaw()
if err != nil {
Logf("Cannot get logs from pod %v", pod.Name)
break
}
Logf("Fluentd Pod %s on node %s has the following logs: %s", pod.Name, nodes.Items[n].Name, body)
break
}
}
} }
} }
Failf("Failed to find all %d log lines", expected) Failf("Failed to find all %d log lines", expected)