Merge pull request #5332 from vishh/heapster_e2e

Adding a ginkgo version of monitoring e2e test
This commit is contained in:
Victor Marmol 2015-03-12 08:37:45 -07:00
commit c03b080328
10 changed files with 1088 additions and 136 deletions

5
Godeps/Godeps.json generated
View File

@ -272,6 +272,11 @@
"Comment": "0.1.3-8-g6633656",
"Rev": "6633656539c1639d9d78127b7d47c622b5d7b6dc"
},
{
"ImportPath": "github.com/influxdb/influxdb/client",
"Comment": "v0.8.8",
"Rev": "afde71eb1740fd763ab9450e1f700ba0e53c36d0"
},
{
"ImportPath": "github.com/kr/pty",
"Comment": "release.r56-25-g05017fc",

View File

@ -0,0 +1,2 @@
influxdb-go
===========

View File

@ -0,0 +1,200 @@
package examples
import (
"fmt"
"github.com/influxdb/influxdb/client"
)
func main() {
TestClient()
}
func TestClient() {
internalTest(true)
}
func TestClientWithoutCompression() {
internalTest(false)
}
func internalTest(compression bool) {
c, err := client.NewClient(&client.ClientConfig{})
if err != nil {
panic(err)
}
admins, err := c.GetClusterAdminList()
if err != nil {
panic(err)
}
if len(admins) == 1 {
if err := c.CreateClusterAdmin("admin", "password"); err != nil {
panic(err)
}
}
admins, err = c.GetClusterAdminList()
if err != nil {
panic(err)
}
if len(admins) != 2 {
panic("more than two admins returned")
}
dbs, err := c.GetDatabaseList()
if err != nil {
panic(err)
}
if len(dbs) == 0 {
if err := c.CreateDatabase("foobar"); err != nil {
panic(err)
}
}
dbs, err = c.GetDatabaseList()
if err != nil {
panic(err)
}
if len(dbs) != 1 && dbs[0]["foobar"] == nil {
panic("List of databases don't match")
}
users, err := c.GetDatabaseUserList("foobar")
if err != nil {
panic(err)
}
if len(users) == 0 {
if err := c.CreateDatabaseUser("foobar", "dbuser", "pass"); err != nil {
panic(err)
}
if err := c.AlterDatabasePrivilege("foobar", "dbuser", true); err != nil {
panic(err)
}
}
users, err = c.GetDatabaseUserList("foobar")
if err != nil {
panic(err)
}
if len(users) != 1 {
panic("more than one user returned")
}
c, err = client.NewClient(&client.ClientConfig{
Username: "dbuser",
Password: "pass",
Database: "foobar",
})
if !compression {
c.DisableCompression()
}
if err != nil {
panic(err)
}
name := "ts9"
if !compression {
name = "ts9_uncompressed"
}
series := &client.Series{
Name: name,
Columns: []string{"value"},
Points: [][]interface{}{
{1.0},
},
}
if err := c.WriteSeries([]*client.Series{series}); err != nil {
panic(err)
}
result, err := c.Query("select * from " + name)
if err != nil {
panic(err)
}
if len(result) != 1 {
panic(fmt.Errorf("expected one time series returned: %d", len(result)))
}
if len(result[0].Points) != 1 {
panic(fmt.Errorf("Expected one point: %d", len(result[0].Points)))
}
if result[0].Points[0][2].(float64) != 1 {
panic("Value not equal to 1")
}
c, err = client.NewClient(&client.ClientConfig{
Username: "root",
Password: "root",
})
if err != nil {
panic(err)
}
spaces, err := c.GetShardSpaces()
if err != nil || len(spaces) == 0 {
panic(fmt.Errorf("Got empty spaces back: %s", err))
}
if spaces[0].Name != "default" {
panic("Space name isn't default")
}
space := &client.ShardSpace{Name: "foo", Regex: "/^paul_is_rad/"}
err = c.CreateShardSpace("foobar", space)
if err != nil {
panic(err)
}
spaces, _ = c.GetShardSpaces()
if spaces[1].Name != "foo" {
panic("Space name isn't foo")
}
shards, err := c.GetShards()
if err != nil {
panic(fmt.Errorf("Couldn't get shards back: %s", err))
}
c, err = client.NewClient(&client.ClientConfig{
Username: "root",
Password: "root",
Database: "",
})
series = &client.Series{
Name: "paul_is_rad",
Columns: []string{"value"},
Points: [][]interface{}{
{1.0},
},
}
if err := c.WriteSeries([]*client.Series{series}); err != nil {
panic(err)
}
spaces, _ = c.GetShardSpaces()
count := 0
for _, s := range shards.All {
if s.SpaceName == "foo" {
count++
}
}
if err := c.DropShardSpace("foobar", "foo"); err != nil {
panic(fmt.Errorf("Error: %s", err))
}
spaces, err = c.GetShardSpaces()
if err != nil || len(spaces) != 1 || spaces[0].Name != "default" {
panic(fmt.Errorf("Error: %s, %d, %s", err, len(spaces), spaces[0].Name))
}
}

View File

@ -0,0 +1,610 @@
package client
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
)
const (
UDPMaxMessageSize = 2048
)
type Client struct {
host string
username string
password string
database string
httpClient *http.Client
udpConn *net.UDPConn
schema string
compression bool
}
type ClientConfig struct {
Host string
Username string
Password string
Database string
HttpClient *http.Client
IsSecure bool
IsUDP bool
}
var defaults *ClientConfig
func init() {
defaults = &ClientConfig{
Host: "localhost:8086",
Username: "root",
Password: "root",
Database: "",
HttpClient: http.DefaultClient,
}
}
func getDefault(value, defaultValue string) string {
if value == "" {
return defaultValue
}
return value
}
func New(config *ClientConfig) (*Client, error) {
return NewClient(config)
}
func NewClient(config *ClientConfig) (*Client, error) {
host := getDefault(config.Host, defaults.Host)
username := getDefault(config.Username, defaults.Username)
password := getDefault(config.Password, defaults.Password)
database := getDefault(config.Database, defaults.Database)
if config.HttpClient == nil {
config.HttpClient = defaults.HttpClient
}
var udpConn *net.UDPConn
if config.IsUDP {
serverAddr, err := net.ResolveUDPAddr("udp", host)
if err != nil {
return nil, err
}
udpConn, err = net.DialUDP("udp", nil, serverAddr)
if err != nil {
return nil, err
}
}
schema := "http"
if config.IsSecure {
schema = "https"
}
return &Client{host, username, password, database, config.HttpClient, udpConn, schema, false}, nil
}
func (self *Client) DisableCompression() {
self.compression = false
}
func (self *Client) getUrl(path string) string {
return self.getUrlWithUserAndPass(path, self.username, self.password)
}
func (self *Client) getUrlWithUserAndPass(path, username, password string) string {
return fmt.Sprintf("%s://%s%s?u=%s&p=%s", self.schema, self.host, path, username, password)
}
func responseToError(response *http.Response, err error, closeResponse bool) error {
if err != nil {
return err
}
if closeResponse {
defer response.Body.Close()
}
if response.StatusCode >= 200 && response.StatusCode < 300 {
return nil
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
return fmt.Errorf("Server returned (%d): %s", response.StatusCode, string(body))
}
func (self *Client) CreateDatabase(name string) error {
url := self.getUrl("/db")
payload := map[string]string{"name": name}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) del(url string) (*http.Response, error) {
return self.delWithBody(url, nil)
}
func (self *Client) delWithBody(url string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest("DELETE", url, body)
if err != nil {
return nil, err
}
return self.httpClient.Do(req)
}
func (self *Client) DeleteDatabase(name string) error {
url := self.getUrl("/db/" + name)
resp, err := self.del(url)
return responseToError(resp, err, true)
}
func (self *Client) get(url string) ([]byte, error) {
resp, err := self.httpClient.Get(url)
err = responseToError(resp, err, false)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
return body, err
}
func (self *Client) getWithVersion(url string) ([]byte, string, error) {
resp, err := self.httpClient.Get(url)
err = responseToError(resp, err, false)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
version := resp.Header.Get("X-Influxdb-Version")
fields := strings.Fields(version)
if len(fields) > 2 {
return body, fields[1], err
}
return body, "", err
}
func (self *Client) listSomething(url string) ([]map[string]interface{}, error) {
body, err := self.get(url)
if err != nil {
return nil, err
}
somethings := []map[string]interface{}{}
err = json.Unmarshal(body, &somethings)
if err != nil {
return nil, err
}
return somethings, nil
}
func (self *Client) GetDatabaseList() ([]map[string]interface{}, error) {
url := self.getUrl("/db")
return self.listSomething(url)
}
func (self *Client) CreateClusterAdmin(name, password string) error {
url := self.getUrl("/cluster_admins")
payload := map[string]string{"name": name, "password": password}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) UpdateClusterAdmin(name, password string) error {
url := self.getUrl("/cluster_admins/" + name)
payload := map[string]string{"password": password}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) DeleteClusterAdmin(name string) error {
url := self.getUrl("/cluster_admins/" + name)
resp, err := self.del(url)
return responseToError(resp, err, true)
}
func (self *Client) GetClusterAdminList() ([]map[string]interface{}, error) {
url := self.getUrl("/cluster_admins")
return self.listSomething(url)
}
func (self *Client) Servers() ([]map[string]interface{}, error) {
url := self.getUrl("/cluster/servers")
return self.listSomething(url)
}
func (self *Client) RemoveServer(id int) error {
resp, err := self.del(self.getUrl(fmt.Sprintf("/cluster/servers/%d", id)))
return responseToError(resp, err, true)
}
// Creates a new database user for the given database. permissions can
// be omitted in which case the user will be able to read and write to
// all time series. If provided, there should be two strings, the
// first for read and the second for write. The strings are regexes
// that are used to match the time series name to determine whether
// the user has the ability to read/write to the given time series.
//
// client.CreateDatabaseUser("db", "user", "pass")
// // the following user cannot read from any series and can write
// // to the limited time series only
// client.CreateDatabaseUser("db", "limited", "pass", "^$", "limited")
func (self *Client) CreateDatabaseUser(database, name, password string, permissions ...string) error {
readMatcher, writeMatcher := ".*", ".*"
switch len(permissions) {
case 0:
case 2:
readMatcher, writeMatcher = permissions[0], permissions[1]
default:
return fmt.Errorf("You have to provide two ")
}
url := self.getUrl("/db/" + database + "/users")
payload := map[string]string{"name": name, "password": password, "readFrom": readMatcher, "writeTo": writeMatcher}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
// Change the cluster admin password
func (self *Client) ChangeClusterAdminPassword(name, newPassword string) error {
url := self.getUrl("/cluster_admins/" + name)
payload := map[string]interface{}{"password": newPassword}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
// Change the user password, adming flag and optionally permissions
func (self *Client) ChangeDatabaseUser(database, name, newPassword string, isAdmin bool, newPermissions ...string) error {
switch len(newPermissions) {
case 0, 2:
default:
return fmt.Errorf("You have to provide two ")
}
url := self.getUrl("/db/" + database + "/users/" + name)
payload := map[string]interface{}{"password": newPassword, "admin": isAdmin}
if len(newPermissions) == 2 {
payload["readFrom"] = newPermissions[0]
payload["writeTo"] = newPermissions[1]
}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
// See Client.CreateDatabaseUser for more info on the permissions
// argument
func (self *Client) updateDatabaseUserCommon(database, name string, password *string, isAdmin *bool, permissions ...string) error {
url := self.getUrl("/db/" + database + "/users/" + name)
payload := map[string]interface{}{}
if password != nil {
payload["password"] = *password
}
if isAdmin != nil {
payload["admin"] = *isAdmin
}
switch len(permissions) {
case 0:
case 2:
payload["readFrom"] = permissions[0]
payload["writeTo"] = permissions[1]
default:
}
data, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) UpdateDatabaseUser(database, name, password string) error {
return self.updateDatabaseUserCommon(database, name, &password, nil)
}
func (self *Client) UpdateDatabaseUserPermissions(database, name, readPermission, writePermissions string) error {
return self.updateDatabaseUserCommon(database, name, nil, nil, readPermission, writePermissions)
}
func (self *Client) DeleteDatabaseUser(database, name string) error {
url := self.getUrl("/db/" + database + "/users/" + name)
resp, err := self.del(url)
return responseToError(resp, err, true)
}
func (self *Client) GetDatabaseUserList(database string) ([]map[string]interface{}, error) {
url := self.getUrl("/db/" + database + "/users")
return self.listSomething(url)
}
func (self *Client) AlterDatabasePrivilege(database, name string, isAdmin bool, permissions ...string) error {
return self.updateDatabaseUserCommon(database, name, nil, &isAdmin, permissions...)
}
type TimePrecision string
const (
Second TimePrecision = "s"
Millisecond TimePrecision = "ms"
Microsecond TimePrecision = "u"
)
func (self *Client) WriteSeries(series []*Series) error {
return self.writeSeriesCommon(series, nil)
}
func (self *Client) WriteSeriesOverUDP(series []*Series) error {
if self.udpConn == nil {
return fmt.Errorf("UDP isn't enabled. Make sure to set config.IsUDP to true")
}
data, err := json.Marshal(series)
if err != nil {
return err
}
// because max of msg over upd is 2048 bytes
// https://github.com/influxdb/influxdb/blob/master/src/api/udp/api.go#L65
if len(data) >= UDPMaxMessageSize {
err = fmt.Errorf("data size over limit %v limit is %v", len(data), UDPMaxMessageSize)
fmt.Println(err)
return err
}
_, err = self.udpConn.Write(data)
if err != nil {
return err
}
return nil
}
func (self *Client) WriteSeriesWithTimePrecision(series []*Series, timePrecision TimePrecision) error {
return self.writeSeriesCommon(series, map[string]string{"time_precision": string(timePrecision)})
}
func (self *Client) writeSeriesCommon(series []*Series, options map[string]string) error {
data, err := json.Marshal(series)
if err != nil {
return err
}
url := self.getUrl("/db/" + self.database + "/series")
for name, value := range options {
url += fmt.Sprintf("&%s=%s", name, value)
}
var b *bytes.Buffer
if self.compression {
b = bytes.NewBuffer(nil)
w := gzip.NewWriter(b)
if _, err := w.Write(data); err != nil {
return err
}
w.Flush()
w.Close()
} else {
b = bytes.NewBuffer(data)
}
req, err := http.NewRequest("POST", url, b)
if err != nil {
return err
}
if self.compression {
req.Header.Set("Content-Encoding", "gzip")
}
resp, err := self.httpClient.Do(req)
return responseToError(resp, err, true)
}
func (self *Client) Query(query string, precision ...TimePrecision) ([]*Series, error) {
return self.queryCommon(query, false, precision...)
}
func (self *Client) QueryWithNumbers(query string, precision ...TimePrecision) ([]*Series, error) {
return self.queryCommon(query, true, precision...)
}
func (self *Client) queryCommon(query string, useNumber bool, precision ...TimePrecision) ([]*Series, error) {
escapedQuery := url.QueryEscape(query)
url := self.getUrl("/db/" + self.database + "/series")
if len(precision) > 0 {
url += "&time_precision=" + string(precision[0])
}
url += "&q=" + escapedQuery
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if !self.compression {
req.Header.Set("Accept-Encoding", "identity")
}
resp, err := self.httpClient.Do(req)
err = responseToError(resp, err, false)
if err != nil {
return nil, err
}
defer resp.Body.Close()
series := []*Series{}
decoder := json.NewDecoder(resp.Body)
if useNumber {
decoder.UseNumber()
}
err = decoder.Decode(&series)
if err != nil {
return nil, err
}
return series, nil
}
func (self *Client) Ping() error {
url := self.getUrl("/ping")
resp, err := self.httpClient.Get(url)
return responseToError(resp, err, true)
}
func (self *Client) AuthenticateDatabaseUser(database, username, password string) error {
url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/authenticate", database), username, password)
resp, err := self.httpClient.Get(url)
return responseToError(resp, err, true)
}
func (self *Client) AuthenticateClusterAdmin(username, password string) error {
url := self.getUrlWithUserAndPass("/cluster_admins/authenticate", username, password)
resp, err := self.httpClient.Get(url)
return responseToError(resp, err, true)
}
type LongTermShortTermShards struct {
// Long term shards, (doesn't get populated for version >= 0.8.0)
LongTerm []*Shard `json:"longTerm"`
// Short term shards, (doesn't get populated for version >= 0.8.0)
ShortTerm []*Shard `json:"shortTerm"`
// All shards in the system (Long + Short term shards for version < 0.8.0)
All []*Shard `json:"-"`
}
type Shard struct {
Id uint32 `json:"id"`
EndTime int64 `json:"endTime"`
StartTime int64 `json:"startTime"`
ServerIds []uint32 `json:"serverIds"`
SpaceName string `json:"spaceName"`
Database string `json:"database"`
}
type ShardSpaceCollection struct {
ShardSpaces []ShardSpace
}
func (self *Client) GetShards() (*LongTermShortTermShards, error) {
url := self.getUrlWithUserAndPass("/cluster/shards", self.username, self.password)
body, version, err := self.getWithVersion(url)
if err != nil {
return nil, err
}
return parseShards(body, version)
}
func isOrNewerThan(version, reference string) bool {
if version == "vdev" {
return true
}
majorMinor := strings.Split(version[1:], ".")[:2]
refMajorMinor := strings.Split(reference[1:], ".")[:2]
if majorMinor[0] > refMajorMinor[0] {
return true
}
if majorMinor[1] > refMajorMinor[1] {
return true
}
return majorMinor[1] == refMajorMinor[1]
}
func parseShards(body []byte, version string) (*LongTermShortTermShards, error) {
// strip the initial v in `v0.8.0` and split on the dots
if version != "" && isOrNewerThan(version, "v0.8") {
return parseNewShards(body)
}
shards := &LongTermShortTermShards{}
err := json.Unmarshal(body, &shards)
if err != nil {
return nil, err
}
shards.All = make([]*Shard, len(shards.LongTerm)+len(shards.ShortTerm))
copy(shards.All, shards.LongTerm)
copy(shards.All[len(shards.LongTerm):], shards.ShortTerm)
return shards, nil
}
func parseNewShards(body []byte) (*LongTermShortTermShards, error) {
shards := []*Shard{}
err := json.Unmarshal(body, &shards)
if err != nil {
return nil, err
}
return &LongTermShortTermShards{All: shards}, nil
}
// Added to InfluxDB in 0.8.0
func (self *Client) GetShardSpaces() ([]*ShardSpace, error) {
url := self.getUrlWithUserAndPass("/cluster/shard_spaces", self.username, self.password)
body, err := self.get(url)
if err != nil {
return nil, err
}
spaces := []*ShardSpace{}
err = json.Unmarshal(body, &spaces)
if err != nil {
return nil, err
}
return spaces, nil
}
// Added to InfluxDB in 0.8.0
func (self *Client) DropShardSpace(database, name string) error {
url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name), self.username, self.password)
_, err := self.del(url)
return err
}
// Added to InfluxDB in 0.8.0
func (self *Client) CreateShardSpace(database string, space *ShardSpace) error {
url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s", database))
data, err := json.Marshal(space)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}
func (self *Client) DropShard(id uint32, serverIds []uint32) error {
url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shards/%d", id), self.username, self.password)
ids := map[string][]uint32{"serverIds": serverIds}
body, err := json.Marshal(ids)
if err != nil {
return err
}
_, err = self.delWithBody(url, bytes.NewBuffer(body))
return err
}
// Added to InfluxDB in 0.8.2
func (self *Client) UpdateShardSpace(database, name string, space *ShardSpace) error {
url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name))
data, err := json.Marshal(space)
if err != nil {
return err
}
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
return responseToError(resp, err, true)
}

View File

@ -0,0 +1,19 @@
package client
type Series struct {
Name string `json:"name"`
Columns []string `json:"columns"`
Points [][]interface{} `json:"points"`
}
func (self *Series) GetName() string {
return self.Name
}
func (self *Series) GetColumns() []string {
return self.Columns
}
func (self *Series) GetPoints() [][]interface{} {
return self.Points
}

View File

@ -0,0 +1,15 @@
package client
type ShardSpace struct {
// required, must be unique within the database
Name string `json:"name"`
// required, a database has many shard spaces and a shard space belongs to a database
Database string `json:"database"`
// this is optional, if they don't set it, we'll set to /.*/
Regex string `json:"regex"`
// this is optional, if they don't set it, it will default to the storage.dir in the config
RetentionPolicy string `json:"retentionPolicy"`
ShardDuration string `json:"shardDuration"`
ReplicationFactor uint32 `json:"replicationFactor"`
Split uint32 `json:"split"`
}

View File

@ -5,6 +5,5 @@ port: 80
containerPort: 8086
labels:
name: influxdb
kubernetes.io/cluster-service: "true"
selector:
name: influxGrafana

View File

@ -47,7 +47,7 @@ ENABLE_DOCKER_REGISTRY_CACHE=true
ENABLE_NODE_MONITORING="${KUBE_ENABLE_NODE_MONITORING:-true}"
# Optional: When set to true, heapster will be setup as part of the cluster bring up.
ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-false}"
ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-true}"
# Optional: Enable node logging.
ENABLE_NODE_LOGGING="${KUBE_ENABLE_NODE_LOGGING:-true}"

View File

@ -1,134 +0,0 @@
#!/bin/bash
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Assumes a running Kubernetes test cluster; verifies that the monitoring setup
# works. Assumes that we're being called by hack/e2e-test.sh (we use some env
# vars it sets up).
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE}")/../..
: ${KUBE_VERSION_ROOT:=${KUBE_ROOT}}
: ${KUBECTL:="${KUBE_VERSION_ROOT}/cluster/kubectl.sh"}
: ${KUBE_CONFIG_FILE:="config-test.sh"}
export KUBECTL KUBE_CONFIG_FILE
source "${KUBE_ROOT}/cluster/kube-env.sh"
source "${KUBE_VERSION_ROOT}/cluster/${KUBERNETES_PROVIDER}/util.sh"
prepare-e2e
MONITORING="${KUBE_ROOT}/cluster/addons/cluster-monitoring"
KUBECTL="${KUBE_ROOT}/cluster/kubectl.sh"
BIGRAND=$(printf "%x\n" $(( $RANDOM << 16 | $RANDOM ))) # random 2^32 in hex
MONITORING_FIREWALL_RULE="monitoring-test-${BIGRAND}"
function setup {
# This only has work to do on gce and gke
if [[ "${KUBERNETES_PROVIDER}" == "gce" ]] || [[ "${KUBERNETES_PROVIDER}" == "gke" ]]; then
detect-project
if ! "${GCLOUD}" compute firewall-rules create "${MONITORING_FIREWALL_RULE}" \
--project "${PROJECT}" \
--network "${NETWORK}" \
--quiet \
--allow tcp:80 tcp:8083 tcp:8086 tcp:9200; then
echo "Failed to set up firewall for monitoring" && false
fi
fi
"${KUBECTL}" create -f "${MONITORING}/"
}
function cleanup {
"${KUBECTL}" stop rc monitoring-influx-grafana-controller &> /dev/null || true
"${KUBECTL}" stop rc monitoring-heapster-controller &> /dev/null || true
"${KUBECTL}" delete -f "${MONITORING}/" &> /dev/null || true
# This only has work to do on gce and gke
if [[ "${KUBERNETES_PROVIDER}" == "gce" ]] || [[ "${KUBERNETES_PROVIDER}" == "gke" ]]; then
detect-project
if "${GCLOUD}" compute firewall-rules describe "${MONITORING_FIREWALL_RULE}" &> /dev/null; then
"${GCLOUD}" compute firewall-rules delete \
--project "${PROJECT}" \
--quiet \
"${MONITORING_FIREWALL_RULE}" || true
fi
fi
}
function influx-data-exists {
local max_retries=10
local retry_delay=30 #seconds
local influx_ip=$("${KUBECTL}" get pods -l name=influxGrafana -o template -t {{range.items}}{{.currentState.hostIP}}:{{end}} | sed s/://g)
local influx_url="http://$influx_ip:8086/db/k8s/series?u=root&p=root"
local ok="false"
for i in `seq 1 10`; do
if curl --retry $max_retries --retry-delay $retry_delay -G $influx_url --data-urlencode "q=select * from stats limit 1" \
&& curl --retry $max_retries --retry-delay $retry_delay -G $influx_url --data-urlencode "q=select * from machine limit 1"; then
echo "retrieved data from InfluxDB."
ok="true"
break
fi
sleep 5
done
if [[ "${ok}" != "true" ]]; then
echo "failed to retrieve stats from InfluxDB. monitoring test failed"
exit 1
fi
}
function wait-for-pods {
local running=false
for i in `seq 1 20`; do
sleep 20
if "${KUBECTL}" get pods -l name=influxGrafana -o template -t {{range.items}}{{.currentState.status}}:{{end}} | grep Running &> /dev/null \
&& "${KUBECTL}" get pods -l name=heapster -o template -t {{range.items}}{{.currentState.status}}:{{end}} | grep Running &> /dev/null; then
running=true
break
fi
done
if [ running == false ]; then
echo "giving up waiting on monitoring pods to be active. monitoring test failed"
exit 1
fi
}
trap cleanup EXIT
# Remove any pre-existing monitoring services.
cleanup
# Start monitoring pods and services.
setup
# Wait for a maximum of 5 minutes for the influx grafana pod to be running.
echo "waiting for monitoring pods to be running"
wait-for-pods
# Wait for some time to let heapster push some stats to InfluxDB.
echo "monitoring pods are running. waiting for stats to be pushed to InfluxDB"
sleep 60
# Check if stats data exists in InfluxDB
echo "checking if stats exist in InfluxDB"
influx-data-exists
echo "monitoring setup works"
exit 0

236
test/e2e/monitoring.go Normal file
View File

@ -0,0 +1,236 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"net/http"
"net/url"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
influxdb "github.com/influxdb/influxdb/client"
. "github.com/onsi/ginkgo"
)
var _ = Describe("Monitoring", func() {
var c *client.Client
BeforeEach(func() {
var err error
c, err = loadClient()
expectNoError(err)
})
It("verify monitoring pods and all cluster nodes are available on influxdb using heapster.", func() {
if testContext.provider != "gce" {
By(fmt.Sprintf("Skipping Monitoring test, which is only supported for provider gce (not %s)",
testContext.provider))
return
}
testMonitoringUsingHeapsterInfluxdb(c)
})
})
const (
influxdbService = "monitoring-influxdb"
influxdbDatabaseName = "k8s"
influxdbUser = "root"
influxdbPW = "root"
podlistQuery = "select distinct(pod) from stats"
nodelistQuery = "select distinct(hostname) from machine"
sleepBetweenAttempts = 5 * time.Second
testTimeout = 5 * time.Minute
)
var (
expectedRcs = map[string]bool{
"monitoring-heapster-controller": false,
"monitoring-influx-grafana-controller": false,
}
expectedServices = map[string]bool{
influxdbService: false,
"monitoring-heapster": false,
"monitoring-grafana": false,
}
)
func verifyExpectedRcsExistAndGetExpectedPods(c *client.Client) ([]string, error) {
rcList, err := c.ReplicationControllers(api.NamespaceDefault).List(labels.Everything())
if err != nil {
return nil, err
}
expectedPods := []string{}
for _, rc := range rcList.Items {
if _, ok := expectedRcs[rc.Name]; ok {
if rc.Status.Replicas != 1 {
return nil, fmt.Errorf("expected to find only one replica for rc %q, found %d", rc.Name, rc.Status.Replicas)
}
expectedRcs[rc.Name] = true
podList, err := c.Pods(api.NamespaceDefault).List(labels.Set(rc.Spec.Selector).AsSelector())
if err != nil {
return nil, err
}
for _, pod := range podList.Items {
expectedPods = append(expectedPods, pod.Name)
}
}
}
for rc, found := range expectedRcs {
if !found {
return nil, fmt.Errorf("Replication Controller %q not found.", rc)
}
}
return expectedPods, nil
}
func expectedServicesExist(c *client.Client) error {
serviceList, err := c.Services(api.NamespaceDefault).List(labels.Everything())
if err != nil {
return err
}
for _, service := range serviceList.Items {
if _, ok := expectedServices[service.Name]; ok {
expectedServices[service.Name] = true
}
}
for service, found := range expectedServices {
if !found {
return fmt.Errorf("Service %q not found", service)
}
}
return nil
}
func getAllNodesInCluster(c *client.Client) ([]string, error) {
nodeList, err := c.Nodes().List()
if err != nil {
return nil, err
}
result := []string{}
for _, node := range nodeList.Items {
result = append(result, node.Name)
}
return result, nil
}
func getInfluxdbData(c *influxdb.Client, query string) (map[string]bool, error) {
series, err := c.Query(query, influxdb.Second)
if err != nil {
return nil, err
}
if len(series) != 1 {
Failf("expected only one series from Influxdb for query %q. Got %+v", query, series)
}
if len(series[0].GetColumns()) != 2 {
Failf("Expected two columns for query %q. Found %v", query, series[0].GetColumns())
}
result := map[string]bool{}
for _, point := range series[0].GetPoints() {
if len(point) != 2 {
Failf("Expected only two entries in a point for query %q. Got %v", query, point)
}
name, ok := point[1].(string)
if !ok {
Failf("expected %v to be a string, but it is %T", point[1], point[1])
}
result[name] = false
}
return result, nil
}
func expectedItemsExist(expectedItems []string, actualItems map[string]bool) bool {
if len(actualItems) < len(expectedItems) {
return false
}
for _, item := range expectedItems {
if _, found := actualItems[item]; !found {
return false
}
}
return true
}
func validatePodsAndNodes(influxdbClient *influxdb.Client, expectedPods, expectedNodes []string) bool {
pods, err := getInfluxdbData(influxdbClient, podlistQuery)
if err != nil {
// We don't fail the test here because the influxdb service might still not be running.
Logf("failed to query list of pods from influxdb. Query: %q, Err: %v", podlistQuery, err)
return false
}
nodes, err := getInfluxdbData(influxdbClient, nodelistQuery)
if err != nil {
Logf("failed to query list of nodes from influxdb. Query: %q, Err: %v", nodelistQuery, err)
return false
}
if !expectedItemsExist(expectedPods, pods) {
Logf("failed to find all expected Pods.\nExpected: %v\nActual: %v", expectedPods, pods)
return false
}
if !expectedItemsExist(expectedNodes, nodes) {
Logf("failed to find all expected Nodes.\nExpected: %v\nActual: %v", expectedNodes, nodes)
return false
}
return true
}
func getMasterHost() string {
masterUrl, err := url.Parse(testContext.host)
expectNoError(err)
return masterUrl.Host
}
func testMonitoringUsingHeapsterInfluxdb(c *client.Client) {
// Check if heapster pods and services are up.
expectedPods, err := verifyExpectedRcsExistAndGetExpectedPods(c)
expectNoError(err)
expectNoError(expectedServicesExist(c))
// TODO: Wait for all pods and services to be running.
kubeMasterHttpClient, ok := c.Client.(*http.Client)
if !ok {
Failf("failed to get master http client")
}
proxyUrl := fmt.Sprintf("%s/api/v1beta1/proxy/services/%s/", getMasterHost(), influxdbService)
config := &influxdb.ClientConfig{
Host: proxyUrl,
// TODO(vishh): Infer username and pw from the Pod spec.
Username: influxdbUser,
Password: influxdbPW,
Database: influxdbDatabaseName,
HttpClient: kubeMasterHttpClient,
IsSecure: true,
}
influxdbClient, err := influxdb.NewClient(config)
expectNoError(err, "failed to create influxdb client")
expectedNodes, err := getAllNodesInCluster(c)
expectNoError(err)
startTime := time.Now()
for {
if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) {
return
}
if time.Since(startTime) >= testTimeout {
break
}
time.Sleep(sleepBetweenAttempts)
}
Failf("monitoring using heapster and influxdb test failed")
}