Merge pull request #14972 from piosz/ir-namespace

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-10-02 11:18:27 -07:00
commit 6b7fe871b2
7 changed files with 338 additions and 197 deletions

View File

@ -122,6 +122,7 @@ iptables-sync-period
ir-data-source ir-data-source
ir-dbname ir-dbname
ir-influxdb-host ir-influxdb-host
ir-namespace-only
ir-password ir-password
ir-user ir-user
jenkins-host jenkins-host

View File

@ -32,8 +32,9 @@ import (
) )
var ( var (
source = flag.String("ir-data-source", "influxdb", "Data source used by InitialResources. Supported options: influxdb.") source = flag.String("ir-data-source", "influxdb", "Data source used by InitialResources. Supported options: influxdb, gcm.")
percentile = flag.Int64("ir-percentile", 90, "Which percentile of samples should InitialResources use when estimating resources. For experiment purposes.") percentile = flag.Int64("ir-percentile", 90, "Which percentile of samples should InitialResources use when estimating resources. For experiment purposes.")
nsOnly = flag.Bool("ir-namespace-only", false, "Whether the estimation should be made only based on data from the same namespace.")
) )
const ( const (
@ -50,19 +51,23 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newInitialResources(s), nil return newInitialResources(s, *percentile, *nsOnly), nil
}) })
} }
type initialResources struct { type initialResources struct {
*admission.Handler *admission.Handler
source dataSource source dataSource
percentile int64
nsOnly bool
} }
func newInitialResources(source dataSource) admission.Interface { func newInitialResources(source dataSource, percentile int64, nsOnly bool) admission.Interface {
return &initialResources{ return &initialResources{
Handler: admission.NewHandler(admission.Create), Handler: admission.NewHandler(admission.Create),
source: source, source: source,
percentile: percentile,
nsOnly: nsOnly,
} }
} }
@ -92,7 +97,7 @@ func (ir initialResources) estimateAndFillResourcesIfNotSet(pod *api.Pod) {
var err error var err error
if _, ok := req[api.ResourceCPU]; !ok { if _, ok := req[api.ResourceCPU]; !ok {
if _, ok2 := lim[api.ResourceCPU]; !ok2 { if _, ok2 := lim[api.ResourceCPU]; !ok2 {
cpu, err = ir.getEstimation(api.ResourceCPU, c) cpu, err = ir.getEstimation(api.ResourceCPU, c, pod.ObjectMeta.Namespace)
if err != nil { if err != nil {
glog.Errorf("Error while trying to estimate resources: %v", err) glog.Errorf("Error while trying to estimate resources: %v", err)
} }
@ -100,7 +105,7 @@ func (ir initialResources) estimateAndFillResourcesIfNotSet(pod *api.Pod) {
} }
if _, ok := req[api.ResourceMemory]; !ok { if _, ok := req[api.ResourceMemory]; !ok {
if _, ok2 := lim[api.ResourceMemory]; !ok2 { if _, ok2 := lim[api.ResourceMemory]; !ok2 {
mem, err = ir.getEstimation(api.ResourceMemory, c) mem, err = ir.getEstimation(api.ResourceMemory, c, pod.ObjectMeta.Namespace)
if err != nil { if err != nil {
glog.Errorf("Error while trying to estimate resources: %v", err) glog.Errorf("Error while trying to estimate resources: %v", err)
} }
@ -138,31 +143,58 @@ func (ir initialResources) estimateAndFillResourcesIfNotSet(pod *api.Pod) {
} }
} }
func (ir initialResources) getEstimation(kind api.ResourceName, c *api.Container) (*resource.Quantity, error) { func (ir initialResources) getEstimation(kind api.ResourceName, c *api.Container, ns string) (*resource.Quantity, error) {
end := time.Now() end := time.Now()
start := end.Add(-week) start := end.Add(-week)
var usage, samples int64 var usage, samples int64
var err error var err error
// Historical data from last 7 days for the same image:tag. // Historical data from last 7 days for the same image:tag within the same namespace.
if usage, samples, err = ir.source.GetUsagePercentile(kind, *percentile, c.Image, true, start, end); err != nil { if usage, samples, err = ir.source.GetUsagePercentile(kind, ir.percentile, c.Image, ns, true, start, end); err != nil {
return nil, err return nil, err
} }
if samples < samplesThreshold { if samples < samplesThreshold {
// Historical data from last 30 days for the same image:tag. // Historical data from last 30 days for the same image:tag within the same namespace.
start := end.Add(-month) start := end.Add(-month)
if usage, samples, err = ir.source.GetUsagePercentile(kind, *percentile, c.Image, true, start, end); err != nil { if usage, samples, err = ir.source.GetUsagePercentile(kind, ir.percentile, c.Image, ns, true, start, end); err != nil {
return nil, err return nil, err
} }
} }
// If we are allowed to estimate only based on data from the same namespace.
if ir.nsOnly {
if samples < samplesThreshold { if samples < samplesThreshold {
// Historical data from last 30 days for the same image. // Historical data from last 30 days for the same image within the same namespace.
start := end.Add(-month) start := end.Add(-month)
image := strings.Split(c.Image, ":")[0] image := strings.Split(c.Image, ":")[0]
if usage, samples, err = ir.source.GetUsagePercentile(kind, *percentile, image, false, start, end); err != nil { if usage, samples, err = ir.source.GetUsagePercentile(kind, ir.percentile, image, ns, false, start, end); err != nil {
return nil, err return nil, err
} }
} }
} else {
if samples < samplesThreshold {
// Historical data from last 7 days for the same image:tag within all namespaces.
start := end.Add(-week)
if usage, samples, err = ir.source.GetUsagePercentile(kind, ir.percentile, c.Image, "", true, start, end); err != nil {
return nil, err
}
}
if samples < samplesThreshold {
// Historical data from last 30 days for the same image:tag within all namespaces.
start := end.Add(-month)
if usage, samples, err = ir.source.GetUsagePercentile(kind, ir.percentile, c.Image, "", true, start, end); err != nil {
return nil, err
}
}
if samples < samplesThreshold {
// Historical data from last 30 days for the same image within all namespaces.
start := end.Add(-month)
image := strings.Split(c.Image, ":")[0]
if usage, samples, err = ir.source.GetUsagePercentile(kind, ir.percentile, image, "", false, start, end); err != nil {
return nil, err
}
}
}
if samples > 0 && kind == api.ResourceCPU { if samples > 0 && kind == api.ResourceCPU {
return resource.NewMilliQuantity(usage, resource.DecimalSI), nil return resource.NewMilliQuantity(usage, resource.DecimalSI), nil

View File

@ -26,11 +26,11 @@ import (
) )
type fakeSource struct { type fakeSource struct {
f func(kind api.ResourceName, perc int64, image string, exactMatch bool, start, end time.Time) (int64, int64, error) f func(kind api.ResourceName, perc int64, image, namespace string, exactMatch bool, start, end time.Time) (int64, int64, error)
} }
func (s *fakeSource) GetUsagePercentile(kind api.ResourceName, perc int64, image string, exactMatch bool, start, end time.Time) (usage int64, samples int64, err error) { func (s *fakeSource) GetUsagePercentile(kind api.ResourceName, perc int64, image, namespace string, exactMatch bool, start, end time.Time) (usage int64, samples int64, err error) {
return s.f(kind, perc, image, exactMatch, start, end) return s.f(kind, perc, image, namespace, exactMatch, start, end)
} }
func parseReq(cpu, mem string) api.ResourceList { func parseReq(cpu, mem string) api.ResourceList {
@ -57,7 +57,7 @@ func addContainer(pod *api.Pod, name, image string, request api.ResourceList) {
func createPod(name string, image string, request api.ResourceList) *api.Pod { func createPod(name string, image string, request api.ResourceList) *api.Pod {
pod := &api.Pod{ pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: name, Namespace: "test"}, ObjectMeta: api.ObjectMeta{Name: name, Namespace: "test-ns"},
Spec: api.PodSpec{}, Spec: api.PodSpec{},
} }
pod.Spec.Containers = []api.Container{} pod.Spec.Containers = []api.Container{}
@ -113,15 +113,7 @@ func admit(t *testing.T, ir admission.Interface, pods []*api.Pod) {
} }
} }
func TestEstimationBasedOnTheSameImage7d(t *testing.T) { func performTest(t *testing.T, ir admission.Interface) {
f := func(_ api.ResourceName, _ int64, _ string, exactMatch bool, start, end time.Time) (int64, int64, error) {
if exactMatch && end.Sub(start) == week {
return 100, 120, nil
}
return 200, 120, nil
}
ir := newInitialResources(&fakeSource{f: f})
pods := getPods() pods := getPods()
admit(t, ir, pods) admit(t, ir, pods)
@ -136,50 +128,74 @@ func TestEstimationBasedOnTheSameImage7d(t *testing.T) {
expectNoAnnotation(t, pods[3]) expectNoAnnotation(t, pods[3])
} }
func TestEstimationBasedOnTheSameImage30d(t *testing.T) { func TestEstimationBasedOnTheSameImageSameNamespace7d(t *testing.T) {
f := func(_ api.ResourceName, _ int64, _ string, exactMatch bool, start, end time.Time) (int64, int64, error) { f := func(_ api.ResourceName, _ int64, _, ns string, exactMatch bool, start, end time.Time) (int64, int64, error) {
if exactMatch && end.Sub(start) == week { if exactMatch && end.Sub(start) == week && ns == "test-ns" {
return 200, 20, nil
}
if exactMatch && end.Sub(start) == month {
return 100, 120, nil return 100, 120, nil
} }
return 200, 120, nil return 200, 120, nil
} }
ir := newInitialResources(&fakeSource{f: f}) performTest(t, newInitialResources(&fakeSource{f: f}, 90, false))
pods := getPods() }
admit(t, ir, pods)
verifyPod(t, pods[0], 100, 100) func TestEstimationBasedOnTheSameImageSameNamespace30d(t *testing.T) {
verifyPod(t, pods[1], 100, 300) f := func(_ api.ResourceName, _ int64, _, ns string, exactMatch bool, start, end time.Time) (int64, int64, error) {
verifyPod(t, pods[2], 300, 100) if exactMatch && end.Sub(start) == week && ns == "test-ns" {
verifyPod(t, pods[3], 300, 300) return 200, 20, nil
}
if exactMatch && end.Sub(start) == month && ns == "test-ns" {
return 100, 120, nil
}
return 200, 120, nil
}
performTest(t, newInitialResources(&fakeSource{f: f}, 90, false))
}
func TestEstimationBasedOnTheSameImageAllNamespaces7d(t *testing.T) {
f := func(_ api.ResourceName, _ int64, _, ns string, exactMatch bool, start, end time.Time) (int64, int64, error) {
if exactMatch && ns == "test-ns" {
return 200, 20, nil
}
if exactMatch && end.Sub(start) == week && ns == "" {
return 100, 120, nil
}
return 200, 120, nil
}
performTest(t, newInitialResources(&fakeSource{f: f}, 90, false))
}
func TestEstimationBasedOnTheSameImageAllNamespaces30d(t *testing.T) {
f := func(_ api.ResourceName, _ int64, _, ns string, exactMatch bool, start, end time.Time) (int64, int64, error) {
if exactMatch && ns == "test-ns" {
return 200, 20, nil
}
if exactMatch && end.Sub(start) == week && ns == "" {
return 200, 20, nil
}
if exactMatch && end.Sub(start) == month && ns == "" {
return 100, 120, nil
}
return 200, 120, nil
}
performTest(t, newInitialResources(&fakeSource{f: f}, 90, false))
} }
func TestEstimationBasedOnOtherImages(t *testing.T) { func TestEstimationBasedOnOtherImages(t *testing.T) {
f := func(_ api.ResourceName, _ int64, image string, exactMatch bool, _, _ time.Time) (int64, int64, error) { f := func(_ api.ResourceName, _ int64, image, ns string, exactMatch bool, _, _ time.Time) (int64, int64, error) {
if image == "image" && !exactMatch { if image == "image" && !exactMatch && ns == "" {
return 100, 5, nil return 100, 5, nil
} }
return 200, 20, nil return 200, 20, nil
} }
ir := newInitialResources(&fakeSource{f: f}) performTest(t, newInitialResources(&fakeSource{f: f}, 90, false))
pods := getPods()
admit(t, ir, pods)
verifyPod(t, pods[0], 100, 100)
verifyPod(t, pods[1], 100, 300)
verifyPod(t, pods[2], 300, 100)
verifyPod(t, pods[3], 300, 300)
} }
func TestNoData(t *testing.T) { func TestNoData(t *testing.T) {
f := func(_ api.ResourceName, _ int64, _ string, _ bool, _, _ time.Time) (int64, int64, error) { f := func(_ api.ResourceName, _ int64, _, ns string, _ bool, _, _ time.Time) (int64, int64, error) {
return 200, 0, nil return 200, 0, nil
} }
ir := newInitialResources(&fakeSource{f: f}) ir := newInitialResources(&fakeSource{f: f}, 90, false)
pods := []*api.Pod{ pods := []*api.Pod{
createPod("p0", "image:v0", parseReq("", "")), createPod("p0", "image:v0", parseReq("", "")),
@ -194,13 +210,13 @@ func TestNoData(t *testing.T) {
} }
func TestManyContainers(t *testing.T) { func TestManyContainers(t *testing.T) {
f := func(_ api.ResourceName, _ int64, _ string, exactMatch bool, _, _ time.Time) (int64, int64, error) { f := func(_ api.ResourceName, _ int64, _, ns string, exactMatch bool, _, _ time.Time) (int64, int64, error) {
if exactMatch { if exactMatch {
return 100, 120, nil return 100, 120, nil
} }
return 200, 30, nil return 200, 30, nil
} }
ir := newInitialResources(&fakeSource{f: f}) ir := newInitialResources(&fakeSource{f: f}, 90, false)
pod := createPod("p", "image:v0", parseReq("", "")) pod := createPod("p", "image:v0", parseReq("", ""))
addContainer(pod, "c1", "image:v1", parseReq("", "300")) addContainer(pod, "c1", "image:v1", parseReq("", "300"))
@ -215,3 +231,24 @@ func TestManyContainers(t *testing.T) {
verifyAnnotation(t, pod, "Initial Resources plugin set: cpu, memory request for container c0; cpu request for container c1; memory request for container c2") verifyAnnotation(t, pod, "Initial Resources plugin set: cpu, memory request for container c0; cpu request for container c1; memory request for container c2")
} }
func TestNamespaceAware(t *testing.T) {
f := func(_ api.ResourceName, _ int64, _, ns string, exactMatch bool, start, end time.Time) (int64, int64, error) {
if ns == "test-ns" {
return 200, 0, nil
}
return 200, 120, nil
}
ir := newInitialResources(&fakeSource{f: f}, 90, true)
pods := []*api.Pod{
createPod("p0", "image:v0", parseReq("", "")),
}
admit(t, ir, pods)
if pods[0].Spec.Containers[0].Resources.Requests != nil {
t.Errorf("Unexpected resource estimation")
}
expectNoAnnotation(t, pods[0])
}

View File

@ -19,22 +19,11 @@ package initialresources
import ( import (
"flag" "flag"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/golang/glog"
influxdb "github.com/influxdb/influxdb/client"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
const (
cpuSeriesName = "autoscaling.cpu.usage.1m"
memSeriesName = "autoscaling.memory.usage.1m"
cpuContinuousQuery = "select derivative(value) as value from \"cpu/usage_ns_cumulative\" where pod_id <> '' group by pod_id, container_name, container_base_image, time(1m) into " + cpuSeriesName
memContinuousQuery = "select mean(value) as value from \"memory/usage_bytes_gauge\" where pod_id <> '' group by pod_id, container_name, container_base_image, time(1m) into " + memSeriesName
timeFormat = "2006-01-02 15:04:05"
)
var ( var (
influxdbHost = flag.String("ir-influxdb-host", "localhost:8080/api/v1/proxy/namespaces/kube-system/services/monitoring-influxdb:api", "Address of InfluxDB which contains metrics requred by InitialResources") influxdbHost = flag.String("ir-influxdb-host", "localhost:8080/api/v1/proxy/namespaces/kube-system/services/monitoring-influxdb:api", "Address of InfluxDB which contains metrics requred by InitialResources")
user = flag.String("ir-user", "root", "User used for connecting to InfluxDB") user = flag.String("ir-user", "root", "User used for connecting to InfluxDB")
@ -50,7 +39,7 @@ type dataSource interface {
// withing time range (start, end), number of samples considered and error if occured. // withing time range (start, end), number of samples considered and error if occured.
// If <exactMatch> then take only samples that concern the same image (both name and take are the same), // If <exactMatch> then take only samples that concern the same image (both name and take are the same),
// otherwise consider also samples with the same image a possibly different tag. // otherwise consider also samples with the same image a possibly different tag.
GetUsagePercentile(kind api.ResourceName, perc int64, image string, exactMatch bool, start, end time.Time) (usage int64, samples int64, err error) GetUsagePercentile(kind api.ResourceName, perc int64, image, namespace string, exactMatch bool, start, end time.Time) (usage int64, samples int64, err error)
} }
func newDataSource(kind string) (dataSource, error) { func newDataSource(kind string) (dataSource, error) {
@ -60,123 +49,8 @@ func newDataSource(kind string) (dataSource, error) {
if kind == "gcm" { if kind == "gcm" {
return newGcmSource() return newGcmSource()
} }
if kind == "hawkular" {
return newHawkularSource()
}
return nil, fmt.Errorf("Unknown data source %v", kind) return nil, fmt.Errorf("Unknown data source %v", kind)
} }
// TODO(piosz): rewrite this once we will migrate into InfluxDB v0.9.
type influxdbSource struct {
conf *influxdb.ClientConfig
}
func newInfluxdbSource(host, user, password, db string) (dataSource, error) {
conf := &influxdb.ClientConfig{
Host: host,
Username: user,
Password: password,
Database: db,
}
source := &influxdbSource{
conf: conf,
}
go source.ensureAutoscalingSeriesExist()
return source, nil
}
func ensureSeriesExists(conn *influxdb.Client, existingQueries *influxdb.Series, seriesName, contQuery string) error {
queryExists := false
for _, p := range existingQueries.GetPoints() {
id := p[1].(float64)
query := p[2].(string)
if strings.Contains(query, "into "+seriesName) {
if query != contQuery {
if _, err := conn.Query(fmt.Sprintf("drop continuous query %v", id), influxdb.Second); err != nil {
return err
}
} else {
queryExists = true
}
}
}
if !queryExists {
if _, err := conn.Query("drop series "+seriesName, influxdb.Second); err != nil {
return err
}
if _, err := conn.Query(contQuery, influxdb.Second); err != nil {
return err
}
}
return nil
}
func (s *influxdbSource) ensureAutoscalingSeriesExist() {
for {
time.Sleep(30 * time.Second)
client, err := influxdb.NewClient(s.conf)
if err != nil {
glog.Errorf("Error while trying to create InfluxDB client: %v", err)
continue
}
series, err := client.Query("list continuous queries", influxdb.Second)
if err != nil {
glog.Errorf("Error while trying to list continuous queries: %v", err)
continue
}
if err := ensureSeriesExists(client, series[0], cpuSeriesName, cpuContinuousQuery); err != nil {
glog.Errorf("Error while trying to create create autoscaling series: %v", err)
continue
}
if err := ensureSeriesExists(client, series[0], memSeriesName, memContinuousQuery); err != nil {
glog.Errorf("Error while trying to create create autoscaling series: %v", err)
continue
}
break
}
}
func (s *influxdbSource) query(query string, precision ...influxdb.TimePrecision) ([]*influxdb.Series, error) {
client, err := influxdb.NewClient(s.conf)
if err != nil {
return nil, err
}
return client.Query(query, precision...)
}
func (s *influxdbSource) GetUsagePercentile(kind api.ResourceName, perc int64, image string, exactMatch bool, start, end time.Time) (int64, int64, error) {
var series string
if kind == api.ResourceCPU {
series = cpuSeriesName
} else if kind == api.ResourceMemory {
series = memSeriesName
}
var imgPattern string
if exactMatch {
imgPattern = "='" + image + "'"
} else {
imgPattern = "=~/^" + image + "/"
}
query := fmt.Sprintf("select percentile(value, %v), count(pod_id) from %v where container_base_image%v and time > '%v' and time < '%v'", perc, series, imgPattern, start.UTC().Format(timeFormat), end.UTC().Format(timeFormat))
var res []*influxdb.Series
var err error
if res, err = s.query(query, influxdb.Second); err != nil {
return 0, 0, fmt.Errorf("Error while trying to query InfluxDB: %v", err)
}
// TODO(pszczesniak): fix issue with dropped data base
if len(res) == 0 {
return 0, 0, fmt.Errorf("Missing series %v in InfluxDB", series)
}
points := res[0].GetPoints()
if len(points) == 0 {
return 0, 0, fmt.Errorf("Missing data in series %v in InfluxDB", series)
}
p := points[0]
usage := p[1].(float64)
count := p[2].(float64)
if kind == api.ResourceCPU {
// convert from ns to millicores
usage = usage / 1000000
}
return int64(usage), int64(count), nil
}

View File

@ -34,6 +34,7 @@ const (
cpuMetricName = kubePrefix + "cpu/usage_rate" cpuMetricName = kubePrefix + "cpu/usage_rate"
memMetricName = kubePrefix + "memory/usage" memMetricName = kubePrefix + "memory/usage"
labelImage = kubePrefix + "label/container_base_image" labelImage = kubePrefix + "label/container_base_image"
labelNs = kubePrefix + "label/pod_namespace"
) )
type gcmSource struct { type gcmSource struct {
@ -61,12 +62,14 @@ func newGcmSource() (dataSource, error) {
}, nil }, nil
} }
func (s *gcmSource) query(metric, oldest, youngest, label, pageToken string) (*gcm.ListTimeseriesResponse, error) { func (s *gcmSource) query(metric, oldest, youngest string, labels []string, pageToken string) (*gcm.ListTimeseriesResponse, error) {
req := s.gcmService.Timeseries.List(s.project, metric, youngest, nil). req := s.gcmService.Timeseries.List(s.project, metric, youngest, nil).
Oldest(oldest). Oldest(oldest).
Labels(label).
Aggregator("mean"). Aggregator("mean").
Window("1m") Window("1m")
for _, l := range labels {
req = req.Labels(l)
}
if pageToken != "" { if pageToken != "" {
req = req.PageToken(pageToken) req = req.PageToken(pageToken)
} }
@ -81,7 +84,7 @@ func retrieveRawSamples(res *gcm.ListTimeseriesResponse, output *[]int) {
} }
} }
func (s *gcmSource) GetUsagePercentile(kind api.ResourceName, perc int64, image string, exactMatch bool, start, end time.Time) (int64, int64, error) { func (s *gcmSource) GetUsagePercentile(kind api.ResourceName, perc int64, image, namespace string, exactMatch bool, start, end time.Time) (int64, int64, error) {
var metric string var metric string
if kind == api.ResourceCPU { if kind == api.ResourceCPU {
metric = cpuMetricName metric = cpuMetricName
@ -89,11 +92,14 @@ func (s *gcmSource) GetUsagePercentile(kind api.ResourceName, perc int64, image
metric = memMetricName metric = memMetricName
} }
var label string var labels []string
if exactMatch { if exactMatch {
label = labelImage + "==" + image labels = append(labels, labelImage+"=="+image)
} else { } else {
label = labelImage + "=~" + image + ".*" labels = append(labels, labelImage+"=~"+image+".*")
}
if namespace != "" {
labels = append(labels, labelNs+"=="+namespace)
} }
oldest := start.Format(time.RFC3339) oldest := start.Format(time.RFC3339)
@ -102,7 +108,7 @@ func (s *gcmSource) GetUsagePercentile(kind api.ResourceName, perc int64, image
rawSamples := make([]int, 0) rawSamples := make([]int, 0)
pageToken := "" pageToken := ""
for { for {
res, err := s.query(metric, oldest, youngest, label, pageToken) res, err := s.query(metric, oldest, youngest, labels, pageToken)
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }

View File

@ -0,0 +1,34 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package initialresources
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api"
)
type hawkularSource struct{}
func newHawkularSource() (dataSource, error) {
return nil, fmt.Errorf("hawkular source not implemented")
}
func (s *hawkularSource) GetUsagePercentile(kind api.ResourceName, perc int64, image, namespace string, exactMatch bool, start, end time.Time) (int64, int64, error) {
return 0, 0, fmt.Errorf("gcm source not implemented")
}

View File

@ -0,0 +1,157 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package initialresources
import (
"fmt"
"strings"
"time"
"github.com/golang/glog"
influxdb "github.com/influxdb/influxdb/client"
"k8s.io/kubernetes/pkg/api"
)
const (
cpuSeriesName = "autoscaling.cpu.usage.1m"
memSeriesName = "autoscaling.memory.usage.1m"
cpuContinuousQuery = "select derivative(value) as value from \"cpu/usage_ns_cumulative\" where pod_id <> '' group by pod_id, pod_namespace, container_name, container_base_image, time(1m) into " + cpuSeriesName
memContinuousQuery = "select mean(value) as value from \"memory/usage_bytes_gauge\" where pod_id <> '' group by pod_id, pod_namespace, container_name, container_base_image, time(1m) into " + memSeriesName
timeFormat = "2006-01-02 15:04:05"
)
// TODO(piosz): rewrite this once we will migrate into InfluxDB v0.9.
type influxdbSource struct {
conf *influxdb.ClientConfig
}
func newInfluxdbSource(host, user, password, db string) (dataSource, error) {
conf := &influxdb.ClientConfig{
Host: host,
Username: user,
Password: password,
Database: db,
}
source := &influxdbSource{
conf: conf,
}
go source.ensureAutoscalingSeriesExist()
return source, nil
}
func ensureSeriesExists(conn *influxdb.Client, existingQueries *influxdb.Series, seriesName, contQuery string) error {
queryExists := false
for _, p := range existingQueries.GetPoints() {
id := p[1].(float64)
query := p[2].(string)
if strings.Contains(query, "into "+seriesName) {
if query != contQuery {
if _, err := conn.Query(fmt.Sprintf("drop continuous query %v", id), influxdb.Second); err != nil {
return err
}
} else {
queryExists = true
}
}
}
if !queryExists {
if _, err := conn.Query("drop series "+seriesName, influxdb.Second); err != nil {
return err
}
if _, err := conn.Query(contQuery, influxdb.Second); err != nil {
return err
}
}
return nil
}
func (s *influxdbSource) ensureAutoscalingSeriesExist() {
for {
time.Sleep(30 * time.Second)
client, err := influxdb.NewClient(s.conf)
if err != nil {
glog.Errorf("Error while trying to create InfluxDB client: %v", err)
continue
}
series, err := client.Query("list continuous queries", influxdb.Second)
if err != nil {
glog.Errorf("Error while trying to list continuous queries: %v", err)
continue
}
if err := ensureSeriesExists(client, series[0], cpuSeriesName, cpuContinuousQuery); err != nil {
glog.Errorf("Error while trying to create create autoscaling series: %v", err)
continue
}
if err := ensureSeriesExists(client, series[0], memSeriesName, memContinuousQuery); err != nil {
glog.Errorf("Error while trying to create create autoscaling series: %v", err)
continue
}
break
}
}
func (s *influxdbSource) query(query string, precision ...influxdb.TimePrecision) ([]*influxdb.Series, error) {
client, err := influxdb.NewClient(s.conf)
if err != nil {
return nil, err
}
return client.Query(query, precision...)
}
func (s *influxdbSource) GetUsagePercentile(kind api.ResourceName, perc int64, image, namespace string, exactMatch bool, start, end time.Time) (int64, int64, error) {
var series string
if kind == api.ResourceCPU {
series = cpuSeriesName
} else if kind == api.ResourceMemory {
series = memSeriesName
}
var imgPattern string
if exactMatch {
imgPattern = "='" + image + "'"
} else {
imgPattern = "=~/^" + image + "/"
}
var namespaceCond string
if namespace != "" {
namespaceCond = " and pod_namespace='" + namespace + "'"
}
query := fmt.Sprintf("select percentile(value, %v), count(pod_id) from %v where container_base_image%v%v and time > '%v' and time < '%v'", perc, series, imgPattern, namespaceCond, start.UTC().Format(timeFormat), end.UTC().Format(timeFormat))
var res []*influxdb.Series
var err error
if res, err = s.query(query, influxdb.Second); err != nil {
return 0, 0, fmt.Errorf("Error while trying to query InfluxDB: %v", err)
}
// TODO(pszczesniak): fix issue with dropped data base
if len(res) == 0 {
return 0, 0, fmt.Errorf("Missing series %v in InfluxDB", series)
}
points := res[0].GetPoints()
if len(points) == 0 {
return 0, 0, fmt.Errorf("Missing data in series %v in InfluxDB", series)
}
p := points[0]
usage := p[1].(float64)
count := p[2].(float64)
if kind == api.ResourceCPU {
// convert from ns to millicores
usage = usage / 1000000
}
return int64(usage), int64(count), nil
}