diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index ef9c747995c..2805f24ef52 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -690,6 +690,10 @@ "ImportPath": "github.com/hashicorp/raft-boltdb", "Rev": "d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee" }, + { + "ImportPath": "github.com/hawkular/hawkular-client-go/metrics", + "Rev": "1d46ce7e1eca635f372357a8ccbf1fa7cc28b7d2" + }, { "ImportPath": "github.com/imdario/mergo", "Comment": "0.1.3-8-g6633656", diff --git a/Godeps/LICENSES.md b/Godeps/LICENSES.md index 448927f8de5..5becf6937c6 100644 --- a/Godeps/LICENSES.md +++ b/Godeps/LICENSES.md @@ -53,6 +53,7 @@ github.com/gorilla/mux | spdxBSD3 github.com/hashicorp/go-msgpack | spdxBSD3 github.com/hashicorp/raft | IntelPart08 github.com/hashicorp/raft-boltdb | IntelPart08 +github.com/hawkular/hawkular-client-go | Apache-2 github.com/imdario/mergo | spdxBSD3 github.com/inconshreveable/mousetrap | Apache-2 github.com/influxdb/influxdb | MITname diff --git a/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/LICENSE b/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/LICENSE new file mode 100644 index 00000000000..8f71f43fee3 --- /dev/null +++ b/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/LICENSE @@ -0,0 +1,202 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/client.go b/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/client.go new file mode 100644 index 00000000000..63ed211f85d --- /dev/null +++ b/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/client.go @@ -0,0 +1,583 @@ +package metrics + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" +) + +// TODO Instrumentation? To get statistics? + +// More detailed error +type HawkularClientError struct { + msg string + Code int +} + +func (self *HawkularClientError) Error() string { + return fmt.Sprintf("Hawkular returned status code %d, error message: %s", self.Code, self.msg) +} + +// Client creation and instance config + +const ( + base_url string = "hawkular/metrics" + timeout time.Duration = time.Duration(30 * time.Second) +) + +type Parameters struct { + Tenant string // Technically optional, but requires setting Tenant() option everytime + Url string + TLSConfig *tls.Config + Token string +} + +type Client struct { + Tenant string + url *url.URL + client *http.Client + Token string +} + +type HawkularClient interface { + Send(*http.Request) (*http.Response, error) +} + +// Modifiers + +type Modifier func(*http.Request) error + +// Override function to replace the Tenant (defaults to Client default) +func Tenant(tenant string) Modifier { + return func(r *http.Request) error { + r.Header.Set("Hawkular-Tenant", tenant) + return nil + } +} + +// Add payload to the request +func Data(data interface{}) Modifier { + return func(r *http.Request) error { + jsonb, err := json.Marshal(data) + if err != nil { + return err + } + + b := bytes.NewBuffer(jsonb) + rc := ioutil.NopCloser(b) + r.Body = rc + + // fmt.Printf("Sending: %s\n", string(jsonb)) + + if b != nil { + r.ContentLength = int64(b.Len()) + } + return nil + } +} + +func (self *Client) Url(method string, e ...Endpoint) Modifier { + // TODO Create composite URLs? Add().Add().. etc? Easier to modify on the fly.. + return func(r *http.Request) error { + u := self.createUrl(e...) + r.URL = u + r.Method = method + return nil + } +} + +// Filters for querying + +type Filter func(r *http.Request) + +func Filters(f ...Filter) Modifier { + return func(r *http.Request) error { + for _, filter := range f { + filter(r) + } + return nil // Or should filter return err? + } +} + +// Add query parameters +func Param(k string, v string) Filter { + return func(r *http.Request) { + q := r.URL.Query() + q.Set(k, v) + r.URL.RawQuery = q.Encode() + } +} + +func TypeFilter(t MetricType) Filter { + return Param("type", t.shortForm()) +} + +func TagsFilter(t map[string]string) Filter { + j := tagsEncoder(t) + return Param("tags", j) +} + +// Requires HWKMETRICS-233 +func IdFilter(regexp string) Filter { + return Param("id", regexp) +} + +func StartTimeFilter(startTime time.Time) Filter { + return Param("start", strconv.Itoa(int(startTime.Unix()))) +} + +func EndTimeFilter(endTime time.Time) Filter { + return Param("end", strconv.Itoa(int(endTime.Unix()))) +} + +func BucketsFilter(buckets int) Filter { + return Param("buckets", strconv.Itoa(buckets)) +} + +func PercentilesFilter(percentiles []float64) Filter { + s := make([]string, 0, len(percentiles)) + for _, v := range percentiles { + s = append(s, fmt.Sprintf("%v", v)) + } + j := strings.Join(s, ",") + return Param("percentiles", j) +} + +// The SEND method.. + +func (self *Client) createRequest() *http.Request { + req := &http.Request{ + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Header: make(http.Header), + Host: self.url.Host, + } + req.Header.Add("Content-Type", "application/json") + req.Header.Add("Hawkular-Tenant", self.Tenant) + + if len(self.Token) > 0 { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", self.Token)) + } + + return req +} + +func (self *Client) Send(o ...Modifier) (*http.Response, error) { + // Initialize + r := self.createRequest() + + // Run all the modifiers + for _, f := range o { + err := f(r) + if err != nil { + return nil, err + } + } + + return self.client.Do(r) +} + +// Commands + +func prepend(slice []Modifier, a ...Modifier) []Modifier { + p := make([]Modifier, 0, len(slice)+len(a)) + p = append(p, a...) + p = append(p, slice...) + return p +} + +// Create new Definition +func (self *Client) Create(md MetricDefinition, o ...Modifier) (bool, error) { + // Keep the order, add custom prepend + o = prepend(o, self.Url("POST", TypeEndpoint(md.Type)), Data(md)) + + r, err := self.Send(o...) + if err != nil { + return false, err + } + + defer r.Body.Close() + + if r.StatusCode > 399 { + err = self.parseErrorResponse(r) + if err, ok := err.(*HawkularClientError); ok { + if err.Code != http.StatusConflict { + return false, err + } else { + return false, nil + } + } + return false, err + } + return true, nil +} + +// Fetch definitions +func (self *Client) Definitions(o ...Modifier) ([]*MetricDefinition, error) { + o = prepend(o, self.Url("GET", TypeEndpoint(Generic))) + + r, err := self.Send(o...) + if err != nil { + return nil, err + } + + defer r.Body.Close() + + if r.StatusCode == http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + md := []*MetricDefinition{} + if b != nil { + if err = json.Unmarshal(b, &md); err != nil { + return nil, err + } + } + return md, err + } else if r.StatusCode > 399 { + return nil, self.parseErrorResponse(r) + } + + return nil, nil +} + +// Return a single definition +func (self *Client) Definition(t MetricType, id string, o ...Modifier) (*MetricDefinition, error) { + o = prepend(o, self.Url("GET", TypeEndpoint(t), SingleMetricEndpoint(id))) + + r, err := self.Send(o...) + if err != nil { + return nil, err + } + + defer r.Body.Close() + + if r.StatusCode == http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + md := &MetricDefinition{} + if b != nil { + if err = json.Unmarshal(b, md); err != nil { + return nil, err + } + } + return md, err + } else if r.StatusCode > 399 { + return nil, self.parseErrorResponse(r) + } + + return nil, nil +} + +// Update tags +func (self *Client) UpdateTags(t MetricType, id string, tags map[string]string, o ...Modifier) error { + o = prepend(o, self.Url("PUT", TypeEndpoint(t), SingleMetricEndpoint(id), TagEndpoint()), Data(tags)) + + r, err := self.Send(o...) + if err != nil { + return err + } + + defer r.Body.Close() + + if r.StatusCode > 399 { + return self.parseErrorResponse(r) + } + + return nil +} + +// Delete given tags from the definition +func (self *Client) DeleteTags(t MetricType, id string, tags map[string]string, o ...Modifier) error { + o = prepend(o, self.Url("DELETE", TypeEndpoint(t), SingleMetricEndpoint(id), TagEndpoint(), TagsEndpoint(tags))) + + r, err := self.Send(o...) + if err != nil { + return err + } + + defer r.Body.Close() + + if r.StatusCode > 399 { + return self.parseErrorResponse(r) + } + + return nil +} + +// Fetch metric definition tags +func (self *Client) Tags(t MetricType, id string, o ...Modifier) (map[string]string, error) { + o = prepend(o, self.Url("GET", TypeEndpoint(t), SingleMetricEndpoint(id), TagEndpoint())) + + r, err := self.Send(o...) + if err != nil { + return nil, err + } + + defer r.Body.Close() + + if r.StatusCode == http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + tags := make(map[string]string) + if b != nil { + if err = json.Unmarshal(b, &tags); err != nil { + return nil, err + } + } + return tags, nil + } else if r.StatusCode > 399 { + return nil, self.parseErrorResponse(r) + } + + return nil, nil +} + +// Write datapoints to the server +func (self *Client) Write(metrics []MetricHeader, o ...Modifier) error { + if len(metrics) > 0 { + mHs := make(map[MetricType][]MetricHeader) + for _, m := range metrics { + if _, found := mHs[m.Type]; !found { + mHs[m.Type] = make([]MetricHeader, 0, 1) + } + mHs[m.Type] = append(mHs[m.Type], m) + } + + wg := &sync.WaitGroup{} + errorsChan := make(chan error, len(mHs)) + + for k, v := range mHs { + wg.Add(1) + go func(k MetricType, v []MetricHeader) { + defer wg.Done() + + // Should be sorted and splitted by type & tenant.. + on := o + on = prepend(on, self.Url("POST", TypeEndpoint(k), DataEndpoint()), Data(v)) + + r, err := self.Send(on...) + if err != nil { + errorsChan <- err + return + } + + defer r.Body.Close() + + if r.StatusCode > 399 { + errorsChan <- self.parseErrorResponse(r) + } + }(k, v) + } + wg.Wait() + select { + case err, ok := <-errorsChan: + if ok { + return err + } + // If channel is closed, we're done + default: + // Nothing to do + } + + } + return nil +} + +// Read data from the server +func (self *Client) ReadMetric(t MetricType, id string, o ...Modifier) ([]*Datapoint, error) { + o = prepend(o, self.Url("GET", TypeEndpoint(t), SingleMetricEndpoint(id), DataEndpoint())) + + r, err := self.Send(o...) + if err != nil { + return nil, err + } + + defer r.Body.Close() + + if r.StatusCode == http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + + // Check for GaugeBucketpoint and so on for the rest.. uh + dp := []*Datapoint{} + if b != nil { + if err = json.Unmarshal(b, &dp); err != nil { + return nil, err + } + } + return dp, nil + } else if r.StatusCode > 399 { + return nil, self.parseErrorResponse(r) + } + + return nil, nil +} + +// TODO ReadMetrics should be equal also, to read new tagsFilter aggregation.. +func (self *Client) ReadBuckets(t MetricType, o ...Modifier) ([]*Bucketpoint, error) { + o = prepend(o, self.Url("GET", TypeEndpoint(t), DataEndpoint())) + + r, err := self.Send(o...) + if err != nil { + return nil, err + } + + defer r.Body.Close() + + if r.StatusCode == http.StatusOK { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return nil, err + } + + // Check for GaugeBucketpoint and so on for the rest.. uh + bp := []*Bucketpoint{} + if b != nil { + if err = json.Unmarshal(b, &bp); err != nil { + return nil, err + } + } + return bp, nil + } else if r.StatusCode > 399 { + return nil, self.parseErrorResponse(r) + } + + return nil, nil +} + +// Initialization + +func NewHawkularClient(p Parameters) (*Client, error) { + uri, err := url.Parse(p.Url) + if err != nil { + return nil, err + } + + if uri.Path == "" { + uri.Path = base_url + } + + u := &url.URL{ + Host: uri.Host, + Path: uri.Path, + Scheme: uri.Scheme, + Opaque: fmt.Sprintf("//%s/%s", uri.Host, uri.Path), + } + + c := &http.Client{ + Timeout: timeout, + } + if p.TLSConfig != nil { + transport := &http.Transport{TLSClientConfig: p.TLSConfig} + c.Transport = transport + } + + return &Client{ + url: u, + Tenant: p.Tenant, + Token: p.Token, + client: c, + }, nil +} + +// HTTP Helper functions + +func cleanId(id string) string { + return url.QueryEscape(id) +} + +func (self *Client) parseErrorResponse(resp *http.Response) error { + // Parse error messages here correctly.. + reply, err := ioutil.ReadAll(resp.Body) + if err != nil { + return &HawkularClientError{Code: resp.StatusCode, + msg: fmt.Sprintf("Reply could not be read: %s", err.Error()), + } + } + + details := &HawkularError{} + + err = json.Unmarshal(reply, details) + if err != nil { + return &HawkularClientError{Code: resp.StatusCode, + msg: fmt.Sprintf("Reply could not be parsed: %s", err.Error()), + } + } + + return &HawkularClientError{Code: resp.StatusCode, + msg: details.ErrorMsg, + } +} + +// URL functions (...) + +type Endpoint func(u *url.URL) + +func (self *Client) createUrl(e ...Endpoint) *url.URL { + mu := *self.url + for _, f := range e { + f(&mu) + } + return &mu +} + +func TypeEndpoint(t MetricType) Endpoint { + return func(u *url.URL) { + addToUrl(u, t.String()) + } +} + +func SingleMetricEndpoint(id string) Endpoint { + return func(u *url.URL) { + addToUrl(u, url.QueryEscape(id)) + } +} + +func TagEndpoint() Endpoint { + return func(u *url.URL) { + addToUrl(u, "tags") + } +} + +func TagsEndpoint(tags map[string]string) Endpoint { + return func(u *url.URL) { + addToUrl(u, tagsEncoder(tags)) + } +} + +func DataEndpoint() Endpoint { + return func(u *url.URL) { + addToUrl(u, "data") + } +} + +func addToUrl(u *url.URL, s string) *url.URL { + u.Opaque = fmt.Sprintf("%s/%s", u.Opaque, s) + return u +} + +func tagsEncoder(t map[string]string) string { + tags := make([]string, 0, len(t)) + for k, v := range t { + tags = append(tags, fmt.Sprintf("%s:%s", k, v)) + } + j := strings.Join(tags, ",") + return j +} diff --git a/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/helpers.go b/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/helpers.go new file mode 100644 index 00000000000..8c8893d123e --- /dev/null +++ b/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/helpers.go @@ -0,0 +1,50 @@ +package metrics + +import ( + "fmt" + "math" + "strconv" + "time" +) + +func ConvertToFloat64(v interface{}) (float64, error) { + switch i := v.(type) { + case float64: + return float64(i), nil + case float32: + return float64(i), nil + case int64: + return float64(i), nil + case int32: + return float64(i), nil + case int16: + return float64(i), nil + case int8: + return float64(i), nil + case uint64: + return float64(i), nil + case uint32: + return float64(i), nil + case uint16: + return float64(i), nil + case uint8: + return float64(i), nil + case int: + return float64(i), nil + case uint: + return float64(i), nil + case string: + f, err := strconv.ParseFloat(i, 64) + if err != nil { + return math.NaN(), err + } + return f, err + default: + return math.NaN(), fmt.Errorf("Cannot convert %s to float64", i) + } +} + +// Returns milliseconds since epoch +func UnixMilli(t time.Time) int64 { + return t.UnixNano() / 1e6 +} diff --git a/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/types.go b/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/types.go new file mode 100644 index 00000000000..8276a2db8d0 --- /dev/null +++ b/Godeps/_workspace/src/github.com/hawkular/hawkular-client-go/metrics/types.go @@ -0,0 +1,129 @@ +package metrics + +import ( + "encoding/json" + "fmt" + // "time" +) + +// MetricType restrictions +type MetricType int + +const ( + Gauge = iota + Availability + Counter + Generic +) + +var longForm = []string{ + "gauges", + "availability", + "counters", + "metrics", +} + +var shortForm = []string{ + "gauge", + "availability", + "counter", + "metrics", +} + +func (self MetricType) validate() error { + if int(self) > len(longForm) && int(self) > len(shortForm) { + return fmt.Errorf("Given MetricType value %d is not valid", self) + } + return nil +} + +func (self MetricType) String() string { + if err := self.validate(); err != nil { + return "unknown" + } + return longForm[self] +} + +func (self MetricType) shortForm() string { + if err := self.validate(); err != nil { + return "unknown" + } + return shortForm[self] +} + +// Custom unmarshaller +func (self *MetricType) UnmarshalJSON(b []byte) error { + var f interface{} + err := json.Unmarshal(b, &f) + if err != nil { + return err + } + + if str, ok := f.(string); ok { + for i, v := range shortForm { + if str == v { + *self = MetricType(i) + break + } + } + } + + return nil +} + +func (self MetricType) MarshalJSON() ([]byte, error) { + return json.Marshal(self.String()) +} + +type SortKey struct { + Tenant string + Type MetricType +} + +// Hawkular-Metrics external structs +// Do I need external.. hmph. + +type MetricHeader struct { + Tenant string `json:"-"` + Type MetricType `json:"-"` + Id string `json:"id"` + Data []Datapoint `json:"data"` +} + +// Value should be convertible to float64 for numeric values +// Timestamp is milliseconds since epoch +type Datapoint struct { + Timestamp int64 `json:"timestamp"` + Value interface{} `json:"value"` + Tags map[string]string `json:"tags,omitempty"` +} + +type HawkularError struct { + ErrorMsg string `json:"errorMsg"` +} + +type MetricDefinition struct { + Tenant string `json:"-"` + Type MetricType `json:"type,omitempty"` + Id string `json:"id"` + Tags map[string]string `json:"tags,omitempty"` + RetentionTime int `json:"dataRetention,omitempty"` +} + +// TODO Fix the Start & End to return a time.Time +type Bucketpoint struct { + Start int64 `json:"start"` + End int64 `json:"end"` + Min float64 `json:"min"` + Max float64 `json:"max"` + Avg float64 `json:"avg"` + Median float64 `json:"median"` + Empty bool `json:"empty"` + Samples int64 `json:"samples"` + Percentiles []Percentile `json:"percentiles"` +} + +type Percentile struct { + Quantile float64 `json:"quantile"` + Value float64 `json:"value"` +} diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index b6d44a13a43..05f2e79d678 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -157,6 +157,7 @@ ir-influxdb-host ir-namespace-only ir-password ir-user +ir-hawkular jenkins-host jenkins-jobs k8s-build-output diff --git a/plugin/pkg/admission/initialresources/data_source.go b/plugin/pkg/admission/initialresources/data_source.go index c391c007f7d..a8b41621d5c 100644 --- a/plugin/pkg/admission/initialresources/data_source.go +++ b/plugin/pkg/admission/initialresources/data_source.go @@ -28,8 +28,9 @@ var ( influxdbHost = flag.String("ir-influxdb-host", "localhost:8080/api/v1/proxy/namespaces/kube-system/services/monitoring-influxdb:api", "Address of InfluxDB which contains metrics requred by InitialResources") user = flag.String("ir-user", "root", "User used for connecting to InfluxDB") // TODO: figure out how to better pass password here - password = flag.String("ir-password", "root", "Password used for connecting to InfluxDB") - db = flag.String("ir-dbname", "k8s", "InfluxDB database name which contains metrics requred by InitialResources") + password = flag.String("ir-password", "root", "Password used for connecting to InfluxDB") + db = flag.String("ir-dbname", "k8s", "InfluxDB database name which contains metrics requred by InitialResources") + hawkularConfig = flag.String("ir-hawkular", "", "Hawkular configuration URL") ) // WARNING: If you are planning to add another implementation of dataSource interface please bear in mind, @@ -50,7 +51,7 @@ func newDataSource(kind string) (dataSource, error) { return newGcmSource() } if kind == "hawkular" { - return newHawkularSource() + return newHawkularSource(*hawkularConfig) } return nil, fmt.Errorf("Unknown data source %v", kind) } diff --git a/plugin/pkg/admission/initialresources/hawkular.go b/plugin/pkg/admission/initialresources/hawkular.go index 1482b0d75c6..9f82d6a6fac 100644 --- a/plugin/pkg/admission/initialresources/hawkular.go +++ b/plugin/pkg/admission/initialresources/hawkular.go @@ -17,18 +17,209 @@ limitations under the License. package initialresources import ( + "crypto/tls" + "crypto/x509" "fmt" + "github.com/golang/glog" + "github.com/hawkular/hawkular-client-go/metrics" + "io/ioutil" + "k8s.io/kubernetes/pkg/api" + "net/http" + "net/url" + "strconv" + "strings" "time" - "k8s.io/kubernetes/pkg/api" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" ) -type hawkularSource struct{} - -func newHawkularSource() (dataSource, error) { - return nil, fmt.Errorf("hawkular source not implemented") +type hawkularSource struct { + client *metrics.Client + uri *url.URL + useNamespace bool + modifiers []metrics.Modifier } -func (s *hawkularSource) GetUsagePercentile(kind api.ResourceName, perc int64, image, namespace string, exactMatch bool, start, end time.Time) (int64, int64, error) { - return 0, 0, fmt.Errorf("gcm source not implemented") +const ( + containerImageTag string = "container_base_image" + descriptorTag string = "descriptor_name" + separator string = "/" + + defaultServiceAccountFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" +) + +// heapsterName gets the equivalent MetricDescriptor.Name used in the Heapster +func heapsterName(kind api.ResourceName) string { + switch kind { + case api.ResourceCPU: + return "cpu/usage" + case api.ResourceMemory: + return "memory/usage" + default: + return "" + } +} + +// tagQuery creates tagFilter query for Hawkular +func tagQuery(kind api.ResourceName, image string, exactMatch bool) map[string]string { + q := make(map[string]string) + + // Add here the descriptor_tag.. + q[descriptorTag] = heapsterName(kind) + + if exactMatch { + q[containerImageTag] = image + } else { + split := strings.Index(image, "@") + if split < 0 { + split = strings.Index(image, ":") + } + q[containerImageTag] = fmt.Sprintf("%s:*", image[:split]) + } + + return q +} + +// dataSource API + +func (hs *hawkularSource) GetUsagePercentile(kind api.ResourceName, perc int64, image, namespace string, exactMatch bool, start, end time.Time) (int64, int64, error) { + q := tagQuery(kind, image, exactMatch) + + m := make([]metrics.Modifier, len(hs.modifiers), 2+len(hs.modifiers)) + copy(m, hs.modifiers) + + if namespace != api.NamespaceAll { + m = append(m, metrics.Tenant(namespace)) + } + + p, err := metrics.ConvertToFloat64(perc) + if err != nil { + return 0, 0, err + } + + m = append(m, metrics.Filters(metrics.TagsFilter(q), metrics.BucketsFilter(1), metrics.StartTimeFilter(start), metrics.EndTimeFilter(end), metrics.PercentilesFilter([]float64{p}))) + + bp, err := hs.client.ReadBuckets(metrics.Counter, m...) + if err != nil { + return 0, 0, err + } + + if len(bp) > 0 && len(bp[0].Percentiles) > 0 { + return int64(bp[0].Percentiles[0].Value), int64(bp[0].Samples), nil + } + return 0, 0, nil +} + +// newHawkularSource creates a new Hawkular Source. The uri follows the scheme from Heapster +func newHawkularSource(uri string) (dataSource, error) { + u, err := url.Parse(uri) + if err != nil { + return nil, err + } + + d := &hawkularSource{ + uri: u, + } + if err = d.init(); err != nil { + return nil, err + } + return d, nil +} + +// init initializes the Hawkular dataSource. Almost equal to the Heapster initialization +func (hs *hawkularSource) init() error { + hs.modifiers = make([]metrics.Modifier, 0) + p := metrics.Parameters{ + Tenant: "heapster", // This data is stored by the heapster - for no-namespace hits + Url: hs.uri.String(), + } + + opts := hs.uri.Query() + + if v, found := opts["tenant"]; found { + p.Tenant = v[0] + } + + if v, found := opts["useServiceAccount"]; found { + if b, _ := strconv.ParseBool(v[0]); b { + accountFile := defaultServiceAccountFile + if file, f := opts["serviceAccountFile"]; f { + accountFile = file[0] + } + + // If a readable service account token exists, then use it + if contents, err := ioutil.ReadFile(accountFile); err == nil { + p.Token = string(contents) + } else { + glog.Errorf("Could not read contents of %s, no token authentication is used\n", defaultServiceAccountFile) + } + } + } + + // Authentication / Authorization parameters + tC := &tls.Config{} + + if v, found := opts["auth"]; found { + if _, f := opts["caCert"]; f { + return fmt.Errorf("Both auth and caCert files provided, combination is not supported") + } + if len(v[0]) > 0 { + // Authfile + kubeConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{ + ExplicitPath: v[0]}, + &clientcmd.ConfigOverrides{}).ClientConfig() + if err != nil { + return err + } + tC, err = client.TLSConfigFor(kubeConfig) + if err != nil { + return err + } + } + } + + if u, found := opts["user"]; found { + if _, wrong := opts["useServiceAccount"]; wrong { + return fmt.Errorf("If user and password are used, serviceAccount cannot be used") + } + if p, f := opts["pass"]; f { + hs.modifiers = append(hs.modifiers, func(req *http.Request) error { + req.SetBasicAuth(u[0], p[0]) + return nil + }) + } + } + + if v, found := opts["caCert"]; found { + caCert, err := ioutil.ReadFile(v[0]) + if err != nil { + return err + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + tC.RootCAs = caCertPool + } + + if v, found := opts["insecure"]; found { + insecure, err := strconv.ParseBool(v[0]) + if err != nil { + return err + } + tC.InsecureSkipVerify = insecure + } + + p.TLSConfig = tC + + c, err := metrics.NewHawkularClient(p) + if err != nil { + return err + } + + hs.client = c + + glog.Infof("Initialised Hawkular Source with parameters %v", p) + return nil } diff --git a/plugin/pkg/admission/initialresources/hawkular_test.go b/plugin/pkg/admission/initialresources/hawkular_test.go new file mode 100644 index 00000000000..6b526173bd8 --- /dev/null +++ b/plugin/pkg/admission/initialresources/hawkular_test.go @@ -0,0 +1,104 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package initialresources + +import ( + "fmt" + "k8s.io/kubernetes/pkg/api" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + assert "github.com/stretchr/testify/require" +) + +const ( + testImageName string = "hawkular/hawkular-metrics" + testImageVersion string = "latest" + testImageSHA string = "b727ece3780cdd30e9a86226e520f26bcc396071ed7a86b7ef6684bb93a9f717" + testPartialMatch string = "hawkular/hawkular-metrics:*" +) + +func testImageWithVersion() string { + return fmt.Sprintf("%s:%s", testImageName, testImageVersion) +} + +func testImageWithReference() string { + return fmt.Sprintf("%s@sha256:%s", testImageName, testImageSHA) +} + +func TestTaqQuery(t *testing.T) { + kind := api.ResourceCPU + tQ := tagQuery(kind, testImageWithVersion(), false) + + assert.Equal(t, 2, len(tQ)) + assert.Equal(t, testPartialMatch, tQ[containerImageTag]) + assert.Equal(t, "cpu/usage", tQ[descriptorTag]) + + tQe := tagQuery(kind, testImageWithVersion(), true) + assert.Equal(t, 2, len(tQe)) + assert.Equal(t, testImageWithVersion(), tQe[containerImageTag]) + assert.Equal(t, "cpu/usage", tQe[descriptorTag]) + + tQr := tagQuery(kind, testImageWithReference(), false) + assert.Equal(t, 2, len(tQe)) + assert.Equal(t, testPartialMatch, tQr[containerImageTag]) + assert.Equal(t, "cpu/usage", tQr[descriptorTag]) + + tQre := tagQuery(kind, testImageWithReference(), true) + assert.Equal(t, 2, len(tQe)) + assert.Equal(t, testImageWithReference(), tQre[containerImageTag]) + assert.Equal(t, "cpu/usage", tQre[descriptorTag]) +} + +func TestGetUsagePercentile(t *testing.T) { + tenant := "16a8884e4c155457ee38a8901df6b536" + reqs := make(map[string]string) + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, tenant, r.Header.Get("Hawkular-Tenant")) + assert.Equal(t, "Basic", r.Header.Get("Authorization")[:5]) + + if strings.Contains(r.RequestURI, "counters/data") { + assert.True(t, strings.Contains(r.RequestURI, url.QueryEscape(testImageWithVersion()))) + assert.True(t, strings.Contains(r.RequestURI, "cpu%2Fusage")) + assert.True(t, strings.Contains(r.RequestURI, "percentiles=90")) + + reqs["counters/data"] = r.RequestURI + fmt.Fprintf(w, ` [{"start":1444620095882,"end":1444648895882,"min":1.45,"avg":1.45,"median":1.45,"max":1.45,"percentile95th":1.45,"samples":123456,"percentiles":[{"value":7896.54,"quantile":0.9},{"value":1.45,"quantile":0.99}],"empty":false}]`) + } else { + reqs["unknown"] = r.RequestURI + } + })) + + paramUri := fmt.Sprintf("%s?user=test&pass=yep", s.URL) + + hSource, err := newHawkularSource(paramUri) + assert.NoError(t, err) + + usage, samples, err := hSource.GetUsagePercentile(api.ResourceCPU, 90, testImageWithVersion(), "16a8884e4c155457ee38a8901df6b536", true, time.Now(), time.Now()) + assert.NoError(t, err) + + assert.Equal(t, 1, len(reqs)) + assert.Equal(t, "", reqs["unknown"]) + + assert.Equal(t, int64(123456), int64(samples)) + assert.Equal(t, int64(7896), usage) // float64 -> int64 +}