Revert "Merge pull request #20215 from kubernetes/revert-20156-bump-influxdb"

This reverts commit 06996d5af9, reversing
changes made to b2f6e7d08f.
This commit is contained in:
Piotr Szczesniak
2016-01-29 09:19:03 +01:00
parent 6e6974a38f
commit 0c5cc1bde1
109 changed files with 36182 additions and 987 deletions

View File

@@ -17,6 +17,8 @@ limitations under the License.
package e2e
import (
"bytes"
"encoding/json"
"fmt"
"net/url"
"time"
@@ -49,10 +51,8 @@ var _ = Describe("[Flaky] Monitoring", func() {
const (
influxdbService = "monitoring-influxdb"
influxdbDatabaseName = "k8s"
influxdbUser = "root"
influxdbPW = "root"
podlistQuery = "select distinct(pod_id) from \"cpu/usage_ns_cumulative\""
nodelistQuery = "select distinct(hostname) from \"cpu/usage_ns_cumulative\""
podlistQuery = "show tag values from \"cpu/usage\" with key = pod_id"
nodelistQuery = "show tag values from \"cpu/usage\" with key = hostname"
sleepBetweenAttempts = 5 * time.Second
testTimeout = 5 * time.Minute
)
@@ -65,6 +65,35 @@ var (
}
)
// Query sends a command to the server and returns the Response
func Query(c *client.Client, query string) (*influxdb.Response, error) {
result, err := c.Get().
Prefix("proxy").
Namespace("kube-system").
Resource("services").
Name(influxdbService+":api").
Suffix("query").
Param("q", query).
Param("db", influxdbDatabaseName).
Param("epoch", "s").
Do().
Raw()
if err != nil {
return nil, err
}
var response influxdb.Response
dec := json.NewDecoder(bytes.NewReader(result))
dec.UseNumber()
err = dec.Decode(&response)
if err != nil {
return nil, err
}
return &response, nil
}
func verifyExpectedRcsExistAndGetExpectedPods(c *client.Client) ([]string, error) {
expectedPods := []string{}
// Iterate over the labels that identify the replication controllers that we
@@ -135,41 +164,24 @@ func getAllNodesInCluster(c *client.Client) ([]string, error) {
return result, nil
}
func getInfluxdbClient(c *client.Client) (*influxdb.Client, error) {
proxyUrl := fmt.Sprintf("%s/api/v1/proxy/namespaces/%s/services/%s:api/", getMasterHost(), api.NamespaceSystem, influxdbService)
config := &influxdb.ClientConfig{
Host: proxyUrl,
// TODO(vishh): Infer username and pw from the Pod spec.
Username: influxdbUser,
Password: influxdbPW,
Database: influxdbDatabaseName,
HttpClient: c.Client,
IsSecure: true,
}
return influxdb.NewClient(config)
}
func getInfluxdbData(c *influxdb.Client, query string) (map[string]bool, error) {
series, err := c.Query(query, influxdb.Second)
func getInfluxdbData(c *client.Client, query string, tag string) (map[string]bool, error) {
response, err := Query(c, query)
if err != nil {
return nil, err
}
if len(series) != 1 {
return nil, fmt.Errorf("expected only one series from Influxdb for query %q. Got %+v", query, series)
if len(response.Results) != 1 {
return nil, fmt.Errorf("expected only one result from Influxdb for query %q. Got %+v", query, response)
}
if len(series[0].GetColumns()) != 2 {
Failf("Expected two columns for query %q. Found %v", query, series[0].GetColumns())
if len(response.Results[0].Series) != 1 {
return nil, fmt.Errorf("expected exactly one series for query %q.", query)
}
if len(response.Results[0].Series[0].Columns) != 1 {
Failf("Expected one column for query %q. Found %v", query, response.Results[0].Series[0].Columns)
}
result := map[string]bool{}
for _, point := range series[0].GetPoints() {
if len(point) != 2 {
Failf("Expected only two entries in a point for query %q. Got %v", query, point)
}
name, ok := point[1].(string)
if !ok {
Failf("expected %v to be a string, but it is %T", point[1], point[1])
}
result[name] = false
for _, value := range response.Results[0].Series[0].Values {
name := value[0].(string)
result[name] = true
}
return result, nil
}
@@ -186,14 +198,14 @@ func expectedItemsExist(expectedItems []string, actualItems map[string]bool) boo
return true
}
func validatePodsAndNodes(influxdbClient *influxdb.Client, expectedPods, expectedNodes []string) bool {
pods, err := getInfluxdbData(influxdbClient, podlistQuery)
func validatePodsAndNodes(c *client.Client, expectedPods, expectedNodes []string) bool {
pods, err := getInfluxdbData(c, podlistQuery, "pod_id")
if err != nil {
// We don't fail the test here because the influxdb service might still not be running.
Logf("failed to query list of pods from influxdb. Query: %q, Err: %v", podlistQuery, err)
return false
}
nodes, err := getInfluxdbData(influxdbClient, nodelistQuery)
nodes, err := getInfluxdbData(c, nodelistQuery, "hostname")
if err != nil {
Logf("failed to query list of nodes from influxdb. Query: %q, Err: %v", nodelistQuery, err)
return false
@@ -222,14 +234,11 @@ func testMonitoringUsingHeapsterInfluxdb(c *client.Client) {
expectNoError(expectedServicesExist(c))
// TODO: Wait for all pods and services to be running.
influxdbClient, err := getInfluxdbClient(c)
expectNoError(err, "failed to create influxdb client")
expectedNodes, err := getAllNodesInCluster(c)
expectNoError(err)
startTime := time.Now()
for {
if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) {
if validatePodsAndNodes(c, expectedPods, expectedNodes) {
return
}
if time.Since(startTime) >= testTimeout {