mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 07:47:56 +00:00
Adding Influxdb client to godeps. This is required for the new monitoring test.
This commit is contained in:
parent
56d365bdd8
commit
23b2f9268e
5
Godeps/Godeps.json
generated
5
Godeps/Godeps.json
generated
@ -243,6 +243,11 @@
|
||||
"Comment": "0.1.3-8-g6633656",
|
||||
"Rev": "6633656539c1639d9d78127b7d47c622b5d7b6dc"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/influxdb/influxdb/client",
|
||||
"Comment": "v0.8.8",
|
||||
"Rev": "afde71eb1740fd763ab9450e1f700ba0e53c36d0"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/kr/pty",
|
||||
"Comment": "release.r56-25-g05017fc",
|
||||
|
2
Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md
generated
vendored
Normal file
2
Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md
generated
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
influxdb-go
|
||||
===========
|
200
Godeps/_workspace/src/github.com/influxdb/influxdb/client/examples/example.go
generated
vendored
Normal file
200
Godeps/_workspace/src/github.com/influxdb/influxdb/client/examples/example.go
generated
vendored
Normal file
@ -0,0 +1,200 @@
|
||||
package examples
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdb/influxdb/client"
|
||||
)
|
||||
|
||||
func main() {
|
||||
TestClient()
|
||||
}
|
||||
|
||||
func TestClient() {
|
||||
internalTest(true)
|
||||
}
|
||||
|
||||
func TestClientWithoutCompression() {
|
||||
internalTest(false)
|
||||
}
|
||||
|
||||
func internalTest(compression bool) {
|
||||
c, err := client.NewClient(&client.ClientConfig{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
admins, err := c.GetClusterAdminList()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(admins) == 1 {
|
||||
if err := c.CreateClusterAdmin("admin", "password"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
admins, err = c.GetClusterAdminList()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(admins) != 2 {
|
||||
panic("more than two admins returned")
|
||||
}
|
||||
|
||||
dbs, err := c.GetDatabaseList()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(dbs) == 0 {
|
||||
if err := c.CreateDatabase("foobar"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
dbs, err = c.GetDatabaseList()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(dbs) != 1 && dbs[0]["foobar"] == nil {
|
||||
panic("List of databases don't match")
|
||||
}
|
||||
|
||||
users, err := c.GetDatabaseUserList("foobar")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(users) == 0 {
|
||||
if err := c.CreateDatabaseUser("foobar", "dbuser", "pass"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := c.AlterDatabasePrivilege("foobar", "dbuser", true); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
users, err = c.GetDatabaseUserList("foobar")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(users) != 1 {
|
||||
panic("more than one user returned")
|
||||
}
|
||||
|
||||
c, err = client.NewClient(&client.ClientConfig{
|
||||
Username: "dbuser",
|
||||
Password: "pass",
|
||||
Database: "foobar",
|
||||
})
|
||||
|
||||
if !compression {
|
||||
c.DisableCompression()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
name := "ts9"
|
||||
if !compression {
|
||||
name = "ts9_uncompressed"
|
||||
}
|
||||
|
||||
series := &client.Series{
|
||||
Name: name,
|
||||
Columns: []string{"value"},
|
||||
Points: [][]interface{}{
|
||||
{1.0},
|
||||
},
|
||||
}
|
||||
if err := c.WriteSeries([]*client.Series{series}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
result, err := c.Query("select * from " + name)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(result) != 1 {
|
||||
panic(fmt.Errorf("expected one time series returned: %d", len(result)))
|
||||
}
|
||||
|
||||
if len(result[0].Points) != 1 {
|
||||
panic(fmt.Errorf("Expected one point: %d", len(result[0].Points)))
|
||||
}
|
||||
|
||||
if result[0].Points[0][2].(float64) != 1 {
|
||||
panic("Value not equal to 1")
|
||||
}
|
||||
|
||||
c, err = client.NewClient(&client.ClientConfig{
|
||||
Username: "root",
|
||||
Password: "root",
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
spaces, err := c.GetShardSpaces()
|
||||
if err != nil || len(spaces) == 0 {
|
||||
panic(fmt.Errorf("Got empty spaces back: %s", err))
|
||||
}
|
||||
if spaces[0].Name != "default" {
|
||||
panic("Space name isn't default")
|
||||
}
|
||||
space := &client.ShardSpace{Name: "foo", Regex: "/^paul_is_rad/"}
|
||||
err = c.CreateShardSpace("foobar", space)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
spaces, _ = c.GetShardSpaces()
|
||||
if spaces[1].Name != "foo" {
|
||||
panic("Space name isn't foo")
|
||||
}
|
||||
shards, err := c.GetShards()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Couldn't get shards back: %s", err))
|
||||
}
|
||||
|
||||
c, err = client.NewClient(&client.ClientConfig{
|
||||
Username: "root",
|
||||
Password: "root",
|
||||
Database: "",
|
||||
})
|
||||
series = &client.Series{
|
||||
Name: "paul_is_rad",
|
||||
Columns: []string{"value"},
|
||||
Points: [][]interface{}{
|
||||
{1.0},
|
||||
},
|
||||
}
|
||||
if err := c.WriteSeries([]*client.Series{series}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
spaces, _ = c.GetShardSpaces()
|
||||
count := 0
|
||||
for _, s := range shards.All {
|
||||
if s.SpaceName == "foo" {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.DropShardSpace("foobar", "foo"); err != nil {
|
||||
panic(fmt.Errorf("Error: %s", err))
|
||||
}
|
||||
|
||||
spaces, err = c.GetShardSpaces()
|
||||
if err != nil || len(spaces) != 1 || spaces[0].Name != "default" {
|
||||
panic(fmt.Errorf("Error: %s, %d, %s", err, len(spaces), spaces[0].Name))
|
||||
}
|
||||
}
|
610
Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go
generated
vendored
Normal file
610
Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go
generated
vendored
Normal file
@ -0,0 +1,610 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
UDPMaxMessageSize = 2048
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
host string
|
||||
username string
|
||||
password string
|
||||
database string
|
||||
httpClient *http.Client
|
||||
udpConn *net.UDPConn
|
||||
schema string
|
||||
compression bool
|
||||
}
|
||||
|
||||
type ClientConfig struct {
|
||||
Host string
|
||||
Username string
|
||||
Password string
|
||||
Database string
|
||||
HttpClient *http.Client
|
||||
IsSecure bool
|
||||
IsUDP bool
|
||||
}
|
||||
|
||||
var defaults *ClientConfig
|
||||
|
||||
func init() {
|
||||
defaults = &ClientConfig{
|
||||
Host: "localhost:8086",
|
||||
Username: "root",
|
||||
Password: "root",
|
||||
Database: "",
|
||||
HttpClient: http.DefaultClient,
|
||||
}
|
||||
}
|
||||
|
||||
func getDefault(value, defaultValue string) string {
|
||||
if value == "" {
|
||||
return defaultValue
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func New(config *ClientConfig) (*Client, error) {
|
||||
return NewClient(config)
|
||||
}
|
||||
|
||||
func NewClient(config *ClientConfig) (*Client, error) {
|
||||
host := getDefault(config.Host, defaults.Host)
|
||||
username := getDefault(config.Username, defaults.Username)
|
||||
password := getDefault(config.Password, defaults.Password)
|
||||
database := getDefault(config.Database, defaults.Database)
|
||||
if config.HttpClient == nil {
|
||||
config.HttpClient = defaults.HttpClient
|
||||
}
|
||||
var udpConn *net.UDPConn
|
||||
if config.IsUDP {
|
||||
serverAddr, err := net.ResolveUDPAddr("udp", host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
udpConn, err = net.DialUDP("udp", nil, serverAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
schema := "http"
|
||||
if config.IsSecure {
|
||||
schema = "https"
|
||||
}
|
||||
return &Client{host, username, password, database, config.HttpClient, udpConn, schema, false}, nil
|
||||
}
|
||||
|
||||
func (self *Client) DisableCompression() {
|
||||
self.compression = false
|
||||
}
|
||||
|
||||
func (self *Client) getUrl(path string) string {
|
||||
return self.getUrlWithUserAndPass(path, self.username, self.password)
|
||||
}
|
||||
|
||||
func (self *Client) getUrlWithUserAndPass(path, username, password string) string {
|
||||
return fmt.Sprintf("%s://%s%s?u=%s&p=%s", self.schema, self.host, path, username, password)
|
||||
}
|
||||
|
||||
func responseToError(response *http.Response, err error, closeResponse bool) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if closeResponse {
|
||||
defer response.Body.Close()
|
||||
}
|
||||
if response.StatusCode >= 200 && response.StatusCode < 300 {
|
||||
return nil
|
||||
}
|
||||
defer response.Body.Close()
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("Server returned (%d): %s", response.StatusCode, string(body))
|
||||
}
|
||||
|
||||
func (self *Client) CreateDatabase(name string) error {
|
||||
url := self.getUrl("/db")
|
||||
payload := map[string]string{"name": name}
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) del(url string) (*http.Response, error) {
|
||||
return self.delWithBody(url, nil)
|
||||
}
|
||||
|
||||
func (self *Client) delWithBody(url string, body io.Reader) (*http.Response, error) {
|
||||
req, err := http.NewRequest("DELETE", url, body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return self.httpClient.Do(req)
|
||||
}
|
||||
|
||||
func (self *Client) DeleteDatabase(name string) error {
|
||||
url := self.getUrl("/db/" + name)
|
||||
resp, err := self.del(url)
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) get(url string) ([]byte, error) {
|
||||
resp, err := self.httpClient.Get(url)
|
||||
err = responseToError(resp, err, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
return body, err
|
||||
}
|
||||
|
||||
func (self *Client) getWithVersion(url string) ([]byte, string, error) {
|
||||
resp, err := self.httpClient.Get(url)
|
||||
err = responseToError(resp, err, false)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
version := resp.Header.Get("X-Influxdb-Version")
|
||||
fields := strings.Fields(version)
|
||||
if len(fields) > 2 {
|
||||
return body, fields[1], err
|
||||
}
|
||||
return body, "", err
|
||||
}
|
||||
|
||||
func (self *Client) listSomething(url string) ([]map[string]interface{}, error) {
|
||||
body, err := self.get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
somethings := []map[string]interface{}{}
|
||||
err = json.Unmarshal(body, &somethings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return somethings, nil
|
||||
}
|
||||
|
||||
func (self *Client) GetDatabaseList() ([]map[string]interface{}, error) {
|
||||
url := self.getUrl("/db")
|
||||
return self.listSomething(url)
|
||||
}
|
||||
|
||||
func (self *Client) CreateClusterAdmin(name, password string) error {
|
||||
url := self.getUrl("/cluster_admins")
|
||||
payload := map[string]string{"name": name, "password": password}
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) UpdateClusterAdmin(name, password string) error {
|
||||
url := self.getUrl("/cluster_admins/" + name)
|
||||
payload := map[string]string{"password": password}
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) DeleteClusterAdmin(name string) error {
|
||||
url := self.getUrl("/cluster_admins/" + name)
|
||||
resp, err := self.del(url)
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) GetClusterAdminList() ([]map[string]interface{}, error) {
|
||||
url := self.getUrl("/cluster_admins")
|
||||
return self.listSomething(url)
|
||||
}
|
||||
|
||||
func (self *Client) Servers() ([]map[string]interface{}, error) {
|
||||
url := self.getUrl("/cluster/servers")
|
||||
return self.listSomething(url)
|
||||
}
|
||||
|
||||
func (self *Client) RemoveServer(id int) error {
|
||||
resp, err := self.del(self.getUrl(fmt.Sprintf("/cluster/servers/%d", id)))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
// Creates a new database user for the given database. permissions can
|
||||
// be omitted in which case the user will be able to read and write to
|
||||
// all time series. If provided, there should be two strings, the
|
||||
// first for read and the second for write. The strings are regexes
|
||||
// that are used to match the time series name to determine whether
|
||||
// the user has the ability to read/write to the given time series.
|
||||
//
|
||||
// client.CreateDatabaseUser("db", "user", "pass")
|
||||
// // the following user cannot read from any series and can write
|
||||
// // to the limited time series only
|
||||
// client.CreateDatabaseUser("db", "limited", "pass", "^$", "limited")
|
||||
func (self *Client) CreateDatabaseUser(database, name, password string, permissions ...string) error {
|
||||
readMatcher, writeMatcher := ".*", ".*"
|
||||
switch len(permissions) {
|
||||
case 0:
|
||||
case 2:
|
||||
readMatcher, writeMatcher = permissions[0], permissions[1]
|
||||
default:
|
||||
return fmt.Errorf("You have to provide two ")
|
||||
}
|
||||
|
||||
url := self.getUrl("/db/" + database + "/users")
|
||||
payload := map[string]string{"name": name, "password": password, "readFrom": readMatcher, "writeTo": writeMatcher}
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
// Change the cluster admin password
|
||||
func (self *Client) ChangeClusterAdminPassword(name, newPassword string) error {
|
||||
url := self.getUrl("/cluster_admins/" + name)
|
||||
payload := map[string]interface{}{"password": newPassword}
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
// Change the user password, adming flag and optionally permissions
|
||||
func (self *Client) ChangeDatabaseUser(database, name, newPassword string, isAdmin bool, newPermissions ...string) error {
|
||||
switch len(newPermissions) {
|
||||
case 0, 2:
|
||||
default:
|
||||
return fmt.Errorf("You have to provide two ")
|
||||
}
|
||||
|
||||
url := self.getUrl("/db/" + database + "/users/" + name)
|
||||
payload := map[string]interface{}{"password": newPassword, "admin": isAdmin}
|
||||
if len(newPermissions) == 2 {
|
||||
payload["readFrom"] = newPermissions[0]
|
||||
payload["writeTo"] = newPermissions[1]
|
||||
}
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
// See Client.CreateDatabaseUser for more info on the permissions
|
||||
// argument
|
||||
func (self *Client) updateDatabaseUserCommon(database, name string, password *string, isAdmin *bool, permissions ...string) error {
|
||||
url := self.getUrl("/db/" + database + "/users/" + name)
|
||||
payload := map[string]interface{}{}
|
||||
if password != nil {
|
||||
payload["password"] = *password
|
||||
}
|
||||
if isAdmin != nil {
|
||||
payload["admin"] = *isAdmin
|
||||
}
|
||||
switch len(permissions) {
|
||||
case 0:
|
||||
case 2:
|
||||
payload["readFrom"] = permissions[0]
|
||||
payload["writeTo"] = permissions[1]
|
||||
default:
|
||||
}
|
||||
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) UpdateDatabaseUser(database, name, password string) error {
|
||||
return self.updateDatabaseUserCommon(database, name, &password, nil)
|
||||
}
|
||||
|
||||
func (self *Client) UpdateDatabaseUserPermissions(database, name, readPermission, writePermissions string) error {
|
||||
return self.updateDatabaseUserCommon(database, name, nil, nil, readPermission, writePermissions)
|
||||
}
|
||||
|
||||
func (self *Client) DeleteDatabaseUser(database, name string) error {
|
||||
url := self.getUrl("/db/" + database + "/users/" + name)
|
||||
resp, err := self.del(url)
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) GetDatabaseUserList(database string) ([]map[string]interface{}, error) {
|
||||
url := self.getUrl("/db/" + database + "/users")
|
||||
return self.listSomething(url)
|
||||
}
|
||||
|
||||
func (self *Client) AlterDatabasePrivilege(database, name string, isAdmin bool, permissions ...string) error {
|
||||
return self.updateDatabaseUserCommon(database, name, nil, &isAdmin, permissions...)
|
||||
}
|
||||
|
||||
type TimePrecision string
|
||||
|
||||
const (
|
||||
Second TimePrecision = "s"
|
||||
Millisecond TimePrecision = "ms"
|
||||
Microsecond TimePrecision = "u"
|
||||
)
|
||||
|
||||
func (self *Client) WriteSeries(series []*Series) error {
|
||||
return self.writeSeriesCommon(series, nil)
|
||||
}
|
||||
|
||||
func (self *Client) WriteSeriesOverUDP(series []*Series) error {
|
||||
if self.udpConn == nil {
|
||||
return fmt.Errorf("UDP isn't enabled. Make sure to set config.IsUDP to true")
|
||||
}
|
||||
|
||||
data, err := json.Marshal(series)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// because max of msg over upd is 2048 bytes
|
||||
// https://github.com/influxdb/influxdb/blob/master/src/api/udp/api.go#L65
|
||||
if len(data) >= UDPMaxMessageSize {
|
||||
err = fmt.Errorf("data size over limit %v limit is %v", len(data), UDPMaxMessageSize)
|
||||
fmt.Println(err)
|
||||
return err
|
||||
}
|
||||
_, err = self.udpConn.Write(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *Client) WriteSeriesWithTimePrecision(series []*Series, timePrecision TimePrecision) error {
|
||||
return self.writeSeriesCommon(series, map[string]string{"time_precision": string(timePrecision)})
|
||||
}
|
||||
|
||||
func (self *Client) writeSeriesCommon(series []*Series, options map[string]string) error {
|
||||
data, err := json.Marshal(series)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
url := self.getUrl("/db/" + self.database + "/series")
|
||||
for name, value := range options {
|
||||
url += fmt.Sprintf("&%s=%s", name, value)
|
||||
}
|
||||
var b *bytes.Buffer
|
||||
if self.compression {
|
||||
b = bytes.NewBuffer(nil)
|
||||
w := gzip.NewWriter(b)
|
||||
if _, err := w.Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
w.Flush()
|
||||
w.Close()
|
||||
} else {
|
||||
b = bytes.NewBuffer(data)
|
||||
}
|
||||
req, err := http.NewRequest("POST", url, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if self.compression {
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
}
|
||||
resp, err := self.httpClient.Do(req)
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) Query(query string, precision ...TimePrecision) ([]*Series, error) {
|
||||
return self.queryCommon(query, false, precision...)
|
||||
}
|
||||
|
||||
func (self *Client) QueryWithNumbers(query string, precision ...TimePrecision) ([]*Series, error) {
|
||||
return self.queryCommon(query, true, precision...)
|
||||
}
|
||||
|
||||
func (self *Client) queryCommon(query string, useNumber bool, precision ...TimePrecision) ([]*Series, error) {
|
||||
escapedQuery := url.QueryEscape(query)
|
||||
url := self.getUrl("/db/" + self.database + "/series")
|
||||
if len(precision) > 0 {
|
||||
url += "&time_precision=" + string(precision[0])
|
||||
}
|
||||
url += "&q=" + escapedQuery
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !self.compression {
|
||||
req.Header.Set("Accept-Encoding", "identity")
|
||||
}
|
||||
resp, err := self.httpClient.Do(req)
|
||||
err = responseToError(resp, err, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
series := []*Series{}
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
if useNumber {
|
||||
decoder.UseNumber()
|
||||
}
|
||||
err = decoder.Decode(&series)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return series, nil
|
||||
}
|
||||
|
||||
func (self *Client) Ping() error {
|
||||
url := self.getUrl("/ping")
|
||||
resp, err := self.httpClient.Get(url)
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) AuthenticateDatabaseUser(database, username, password string) error {
|
||||
url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/authenticate", database), username, password)
|
||||
resp, err := self.httpClient.Get(url)
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) AuthenticateClusterAdmin(username, password string) error {
|
||||
url := self.getUrlWithUserAndPass("/cluster_admins/authenticate", username, password)
|
||||
resp, err := self.httpClient.Get(url)
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
type LongTermShortTermShards struct {
|
||||
// Long term shards, (doesn't get populated for version >= 0.8.0)
|
||||
LongTerm []*Shard `json:"longTerm"`
|
||||
// Short term shards, (doesn't get populated for version >= 0.8.0)
|
||||
ShortTerm []*Shard `json:"shortTerm"`
|
||||
// All shards in the system (Long + Short term shards for version < 0.8.0)
|
||||
All []*Shard `json:"-"`
|
||||
}
|
||||
|
||||
type Shard struct {
|
||||
Id uint32 `json:"id"`
|
||||
EndTime int64 `json:"endTime"`
|
||||
StartTime int64 `json:"startTime"`
|
||||
ServerIds []uint32 `json:"serverIds"`
|
||||
SpaceName string `json:"spaceName"`
|
||||
Database string `json:"database"`
|
||||
}
|
||||
|
||||
type ShardSpaceCollection struct {
|
||||
ShardSpaces []ShardSpace
|
||||
}
|
||||
|
||||
func (self *Client) GetShards() (*LongTermShortTermShards, error) {
|
||||
url := self.getUrlWithUserAndPass("/cluster/shards", self.username, self.password)
|
||||
body, version, err := self.getWithVersion(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return parseShards(body, version)
|
||||
}
|
||||
|
||||
func isOrNewerThan(version, reference string) bool {
|
||||
if version == "vdev" {
|
||||
return true
|
||||
}
|
||||
majorMinor := strings.Split(version[1:], ".")[:2]
|
||||
refMajorMinor := strings.Split(reference[1:], ".")[:2]
|
||||
if majorMinor[0] > refMajorMinor[0] {
|
||||
return true
|
||||
}
|
||||
if majorMinor[1] > refMajorMinor[1] {
|
||||
return true
|
||||
}
|
||||
return majorMinor[1] == refMajorMinor[1]
|
||||
}
|
||||
|
||||
func parseShards(body []byte, version string) (*LongTermShortTermShards, error) {
|
||||
// strip the initial v in `v0.8.0` and split on the dots
|
||||
if version != "" && isOrNewerThan(version, "v0.8") {
|
||||
return parseNewShards(body)
|
||||
}
|
||||
shards := &LongTermShortTermShards{}
|
||||
err := json.Unmarshal(body, &shards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
shards.All = make([]*Shard, len(shards.LongTerm)+len(shards.ShortTerm))
|
||||
copy(shards.All, shards.LongTerm)
|
||||
copy(shards.All[len(shards.LongTerm):], shards.ShortTerm)
|
||||
return shards, nil
|
||||
}
|
||||
|
||||
func parseNewShards(body []byte) (*LongTermShortTermShards, error) {
|
||||
shards := []*Shard{}
|
||||
err := json.Unmarshal(body, &shards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &LongTermShortTermShards{All: shards}, nil
|
||||
}
|
||||
|
||||
// Added to InfluxDB in 0.8.0
|
||||
func (self *Client) GetShardSpaces() ([]*ShardSpace, error) {
|
||||
url := self.getUrlWithUserAndPass("/cluster/shard_spaces", self.username, self.password)
|
||||
body, err := self.get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
spaces := []*ShardSpace{}
|
||||
err = json.Unmarshal(body, &spaces)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return spaces, nil
|
||||
}
|
||||
|
||||
// Added to InfluxDB in 0.8.0
|
||||
func (self *Client) DropShardSpace(database, name string) error {
|
||||
url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name), self.username, self.password)
|
||||
_, err := self.del(url)
|
||||
return err
|
||||
}
|
||||
|
||||
// Added to InfluxDB in 0.8.0
|
||||
func (self *Client) CreateShardSpace(database string, space *ShardSpace) error {
|
||||
url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s", database))
|
||||
data, err := json.Marshal(space)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
||||
|
||||
func (self *Client) DropShard(id uint32, serverIds []uint32) error {
|
||||
url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shards/%d", id), self.username, self.password)
|
||||
ids := map[string][]uint32{"serverIds": serverIds}
|
||||
body, err := json.Marshal(ids)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = self.delWithBody(url, bytes.NewBuffer(body))
|
||||
return err
|
||||
}
|
||||
|
||||
// Added to InfluxDB in 0.8.2
|
||||
func (self *Client) UpdateShardSpace(database, name string, space *ShardSpace) error {
|
||||
url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name))
|
||||
data, err := json.Marshal(space)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
|
||||
return responseToError(resp, err, true)
|
||||
}
|
19
Godeps/_workspace/src/github.com/influxdb/influxdb/client/series.go
generated
vendored
Normal file
19
Godeps/_workspace/src/github.com/influxdb/influxdb/client/series.go
generated
vendored
Normal file
@ -0,0 +1,19 @@
|
||||
package client
|
||||
|
||||
type Series struct {
|
||||
Name string `json:"name"`
|
||||
Columns []string `json:"columns"`
|
||||
Points [][]interface{} `json:"points"`
|
||||
}
|
||||
|
||||
func (self *Series) GetName() string {
|
||||
return self.Name
|
||||
}
|
||||
|
||||
func (self *Series) GetColumns() []string {
|
||||
return self.Columns
|
||||
}
|
||||
|
||||
func (self *Series) GetPoints() [][]interface{} {
|
||||
return self.Points
|
||||
}
|
15
Godeps/_workspace/src/github.com/influxdb/influxdb/client/shard_space.go
generated
vendored
Normal file
15
Godeps/_workspace/src/github.com/influxdb/influxdb/client/shard_space.go
generated
vendored
Normal file
@ -0,0 +1,15 @@
|
||||
package client
|
||||
|
||||
type ShardSpace struct {
|
||||
// required, must be unique within the database
|
||||
Name string `json:"name"`
|
||||
// required, a database has many shard spaces and a shard space belongs to a database
|
||||
Database string `json:"database"`
|
||||
// this is optional, if they don't set it, we'll set to /.*/
|
||||
Regex string `json:"regex"`
|
||||
// this is optional, if they don't set it, it will default to the storage.dir in the config
|
||||
RetentionPolicy string `json:"retentionPolicy"`
|
||||
ShardDuration string `json:"shardDuration"`
|
||||
ReplicationFactor uint32 `json:"replicationFactor"`
|
||||
Split uint32 `json:"split"`
|
||||
}
|
Loading…
Reference in New Issue
Block a user