diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 08f0431dbfb..7fecb9eff71 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -615,6 +615,11 @@ "ImportPath": "gopkg.in/yaml.v2", "Rev": "d466437aa4adc35830964cffc5b5f262c63ddcb4" }, + { + "ImportPath": "k8s.io/heapster/api/v1/types", + "Comment": "v0.17.0-75-g0e1b652", + "Rev": "0e1b652781812dee2c51c75180fc590223e0b9c6" + }, { "ImportPath": "speter.net/go/exp/math/dec/inf", "Rev": "42ca6cd68aa922bc3f32f1e056e61b65945d9ad7" diff --git a/Godeps/_workspace/src/k8s.io/heapster/api/v1/types/model_types.go b/Godeps/_workspace/src/k8s.io/heapster/api/v1/types/model_types.go new file mode 100644 index 00000000000..ed3d418e598 --- /dev/null +++ b/Godeps/_workspace/src/k8s.io/heapster/api/v1/types/model_types.go @@ -0,0 +1,33 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "time" +) + +type MetricPoint struct { + Timestamp time.Time `json:"timestamp"` + Value uint64 `json:"value"` +} + +type MetricResult struct { + Metrics []MetricPoint `json:"metrics"` + LatestTimestamp time.Time `json:"latestTimestamp"` +} + +type MetricResultList struct { + Items []MetricResult `json:"items"` +} diff --git a/Godeps/_workspace/src/k8s.io/heapster/api/v1/types/types.go b/Godeps/_workspace/src/k8s.io/heapster/api/v1/types/types.go new file mode 100644 index 00000000000..d644ac80138 --- /dev/null +++ b/Godeps/_workspace/src/k8s.io/heapster/api/v1/types/types.go @@ -0,0 +1,81 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "time" +) + +// Timeseries represents a set of metrics for the same target object +// (typically a container). +type Timeseries struct { + // Map of metric names to their values. + Metrics map[string][]Point `json:"metrics"` + + // Common labels for all metrics. + Labels map[string]string `json:"labels,omitempty"` +} + +// Point represent a metric value. +type Point struct { + // The start and end time for which this data is representative. + Start time.Time `json:"start"` + End time.Time `json:"end"` + + // Labels specific to this data point. + Labels map[string]string `json:"labels,omitempty"` + + // The value of the metric. + Value interface{} `json:"value"` +} + +// TimeseriesSchema represents all the metrics and labels. +type TimeseriesSchema struct { + // All the metrics handled by heapster. + Metrics []MetricDescriptor `json:"metrics,omitempty"` + // Labels that are common to all metrics. + CommonLabels []LabelDescriptor `json:"common_labels,omitempty"` + // Labels that are present only for containers in pods. + // A container metric belongs to a pod is "pod_name" label is set. + PodLabels []LabelDescriptor `json:"pod_labels,omitempty"` +} + +type MetricDescriptor struct { + // The unique name of the metric. + Name string `json:"name,omitempty"` + + // Description of the metric. + Description string `json:"description,omitempty"` + + // Descriptor of the labels specific to this metric. + Labels []LabelDescriptor `json:"labels,omitempty"` + + // Type and value of metric data. + Type string `json:"type,omitempty"` + + // The type of value returned as part of this metric. + ValueType string `json:"value_type,omitempty"` + + // The units of the value returned as part of this metric. + Units string `json:"units,omitempty"` +} + +type LabelDescriptor struct { + // Key to use for the label. + Key string `json:"key,omitempty"` + + // Description of the label. + Description string `json:"description,omitempty"` +} diff --git a/pkg/client/unversioned/scale.go b/pkg/client/unversioned/scale.go index 4a3ba9a00f2..c0d4b662e37 100644 --- a/pkg/client/unversioned/scale.go +++ b/pkg/client/unversioned/scale.go @@ -30,6 +30,7 @@ type ScaleNamespacer interface { // ScaleInterface has methods to work with Scale (sub)resources. type ScaleInterface interface { Get(string, string) (*expapi.Scale, error) + Update(string, *expapi.Scale) (*expapi.Scale, error) } // horizontalPodAutoscalers implements HorizontalPodAutoscalersNamespacer interface @@ -57,3 +58,22 @@ func (c *scales) Get(kind string, name string) (result *expapi.Scale, err error) err = fmt.Errorf("Kind not supported: %s", kind) return } + +func (c *scales) Update(kind string, scale *expapi.Scale) (result *expapi.Scale, err error) { + result = &expapi.Scale{} + if strings.ToLower(kind) == "replicationcontroller" { + kind = "replicationControllers" + + err = c.client.Put(). + Namespace(scale.Namespace). + Resource(kind). + Name(scale.Name). + SubResource("scale"). + Body(scale). + Do(). + Into(result) + return + } + err = fmt.Errorf("Kind not supported: %s", kind) + return +} diff --git a/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go b/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go index 1e486a16dc1..c713eecf7aa 100644 --- a/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go +++ b/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go @@ -17,16 +17,21 @@ limitations under the License. package autoscalercontroller import ( + "encoding/json" "fmt" "strings" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/expapi" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" + + heapster "k8s.io/heapster/api/v1/types" ) const ( @@ -34,16 +39,38 @@ const ( heapsterService = "monitoring-heapster" ) -var resourceToMetric = map[api.ResourceName]string{ - api.ResourceCPU: "cpu-usage", -} -var heapsterQueryStart, _ = time.ParseDuration("-20m") - type HorizontalPodAutoscalerController struct { client *client.Client expClient client.ExperimentalInterface } +// Aggregates results into ResourceConsumption. Also returns number of +// pods included in the aggregation. +type metricAggregator func(heapster.MetricResultList) (expapi.ResourceConsumption, int) + +type metricDefinition struct { + name string + aggregator metricAggregator +} + +var resourceDefinitions = map[api.ResourceName]metricDefinition{ + //TODO: add memory + api.ResourceCPU: {"cpu-usage", + func(metrics heapster.MetricResultList) (expapi.ResourceConsumption, int) { + sum, count := calculateSumFromLatestSample(metrics) + value := "0" + if count > 0 { + // assumes that cpu usage is in millis + value = fmt.Sprintf("%dm", sum/uint64(count)) + } + return expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count + }}, +} + +var heapsterQueryStart, _ = time.ParseDuration("-5m") +var downscaleForbiddenWindow, _ = time.ParseDuration("20m") +var upscaleForbiddenWindow, _ = time.ParseDuration("3m") + func New(client *client.Client, expClient client.ExperimentalInterface) *HorizontalPodAutoscalerController { //TODO: switch to client.Interface return &HorizontalPodAutoscalerController{ @@ -86,16 +113,18 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { podNames = append(podNames, pod.Name) } - metric, metricDefined := resourceToMetric[hpa.Spec.Target.Resource] + metricSpec, metricDefined := resourceDefinitions[hpa.Spec.Target.Resource] if !metricDefined { glog.Warningf("Heapster metric not defined for %s %v", reference, hpa.Spec.Target.Resource) continue } - startTime := time.Now().Add(heapsterQueryStart) + now := time.Now() + + startTime := now.Add(heapsterQueryStart) metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s", hpa.Spec.ScaleRef.Namespace, strings.Join(podNames, ","), - metric) + metricSpec.name) resultRaw, err := a.client. Get(). @@ -113,7 +142,90 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { continue } + var metrics heapster.MetricResultList + err = json.Unmarshal(resultRaw, &metrics) + if err != nil { + glog.Warningf("Failed to unmarshall heapster response: %v", err) + continue + } + glog.Infof("Metrics available for %s: %s", reference, string(resultRaw)) + + currentConsumption, count := metricSpec.aggregator(metrics) + if count != len(podList.Items) { + glog.Warningf("Metrics obtained for %d/%d of pods", count, len(podList.Items)) + continue + } + + // if the ratio is 1.2 we want to have 2 replicas + desiredReplicas := 1 + int((currentConsumption.Quantity.MilliValue()*int64(count))/hpa.Spec.Target.Quantity.MilliValue()) + + if desiredReplicas < hpa.Spec.MinCount { + desiredReplicas = hpa.Spec.MinCount + } + if desiredReplicas > hpa.Spec.MaxCount { + desiredReplicas = hpa.Spec.MaxCount + } + + rescale := false + + if desiredReplicas != count { + // Going down + if desiredReplicas < count && (hpa.Status.LastScaleTimestamp == nil || + hpa.Status.LastScaleTimestamp.Add(downscaleForbiddenWindow).Before(now)) { + rescale = true + } + + // Going up + if desiredReplicas > count && (hpa.Status.LastScaleTimestamp == nil || + hpa.Status.LastScaleTimestamp.Add(upscaleForbiddenWindow).Before(now)) { + rescale = true + } + + if rescale { + scale.Spec.Replicas = desiredReplicas + _, err = a.expClient.Scales(hpa.Namespace).Update(hpa.Spec.ScaleRef.Kind, scale) + if err != nil { + glog.Warningf("Failed to rescale %s: %v", reference, err) + continue + } + } + } + + hpa.Status = expapi.HorizontalPodAutoscalerStatus{ + CurrentReplicas: count, + DesiredReplicas: desiredReplicas, + CurrentConsumption: currentConsumption, + } + if rescale { + now := util.NewTime(now) + hpa.Status.LastScaleTimestamp = &now + } + + _, err = a.expClient.HorizontalPodAutoscalers(hpa.Namespace).Update(&hpa) + if err != nil { + glog.Warningf("Failed to update HorizontalPodAutoscaler %s: %v", hpa.Name, err) + continue + } } return nil } + +func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) { + sum := uint64(0) + count := 0 + for _, metrics := range metrics.Items { + var newest *heapster.MetricPoint + newest = nil + for _, metricPoint := range metrics.Metrics { + if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) { + newest = &metricPoint + } + } + if newest != nil { + sum += newest.Value + count++ + } + } + return sum, count +} diff --git a/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go b/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go index b7ab76b1cf4..9ef640d46d1 100644 --- a/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go +++ b/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go @@ -17,10 +17,12 @@ limitations under the License. package autoscalercontroller import ( + "encoding/json" "fmt" "net/http" "net/http/httptest" "testing" + "time" "k8s.io/kubernetes/pkg/api" _ "k8s.io/kubernetes/pkg/api/latest" @@ -32,6 +34,9 @@ import ( "k8s.io/kubernetes/pkg/util" "github.com/golang/glog" + "github.com/stretchr/testify/assert" + + heapster "k8s.io/heapster/api/v1/types" ) const ( @@ -39,20 +44,23 @@ const ( rcName = "app-rc" podNameLabel = "app" podName = "p1" -) + hpaName = "foo" -var target = expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.8")} + hpaListHandler = "HpaList" + scaleHandler = "Scale" + podListHandler = "PodList" + heapsterHandler = "Heapster" + updateHpaHandler = "HpaUpdate" +) type serverResponse struct { statusCode int obj interface{} } -func makeTestServer(t *testing.T, hpaResponse serverResponse, - scaleResponse serverResponse, podListResponse serverResponse, - heapsterResponse serverResponse) (*httptest.Server, []*util.FakeHandler) { +func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) { - handlers := []*util.FakeHandler{} + handlers := map[string]*util.FakeHandler{} mux := http.NewServeMux() mkHandler := func(url string, response serverResponse) *util.FakeHandler { @@ -75,13 +83,29 @@ func makeTestServer(t *testing.T, hpaResponse serverResponse, return &handler } - handlers = append(handlers, mkHandler("/experimental/v1/horizontalpodautoscalers", hpaResponse)) - handlers = append(handlers, mkHandler( - fmt.Sprintf("/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), scaleResponse)) - handlers = append(handlers, mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), podListResponse)) - handlers = append(handlers, mkRawHandler( - fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage", - namespace, podName), heapsterResponse)) + if responses[hpaListHandler] != nil { + handlers[hpaListHandler] = mkHandler("/experimental/v1/horizontalpodautoscalers", *responses[hpaListHandler]) + } + + if responses[scaleHandler] != nil { + handlers[scaleHandler] = mkHandler( + fmt.Sprintf("/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), *responses[scaleHandler]) + } + + if responses[podListHandler] != nil { + handlers[podListHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), *responses[podListHandler]) + } + + if responses[heapsterHandler] != nil { + handlers[heapsterHandler] = mkRawHandler( + fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage", + namespace, podName), *responses[heapsterHandler]) + } + + if responses[updateHpaHandler] != nil { + handlers[updateHpaHandler] = mkHandler(fmt.Sprintf("/experimental/v1/namespaces/%s/horizontalpodautoscalers/%s", namespace, hpaName), + *responses[updateHpaHandler]) + } mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { t.Errorf("unexpected request: %v", req.RequestURI) @@ -96,7 +120,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Items: []expapi.HorizontalPodAutoscaler{ { ObjectMeta: api.ObjectMeta{ - Name: "foo", + Name: hpaName, Namespace: namespace, }, Spec: expapi.HorizontalPodAutoscalerSpec{ @@ -108,20 +132,20 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { }, MinCount: 1, MaxCount: 5, - Target: target, + Target: expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.3")}, }, }}}} scaleResponse := serverResponse{http.StatusOK, &expapi.Scale{ ObjectMeta: api.ObjectMeta{ - Name: "rcName", + Name: rcName, Namespace: namespace, }, Spec: expapi.ScaleSpec{ - Replicas: 5, + Replicas: 1, }, Status: expapi.ScaleStatus{ - Replicas: 2, + Replicas: 1, Selector: map[string]string{"name": podNameLabel}, }, }} @@ -134,11 +158,49 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Namespace: namespace, }, }}}} + timestamp := time.Now() + metrics := heapster.MetricResultList{ + Items: []heapster.MetricResult{{ + Metrics: []heapster.MetricPoint{{timestamp, 650}}, + LatestTimestamp: timestamp, + }}} - heapsterRawResponse := "UPADTE ME" - heapsterResponse := serverResponse{http.StatusOK, &heapsterRawResponse} + updateHpaResponse := serverResponse{http.StatusOK, &expapi.HorizontalPodAutoscaler{ + + ObjectMeta: api.ObjectMeta{ + Name: hpaName, + Namespace: namespace, + }, + Spec: expapi.HorizontalPodAutoscalerSpec{ + ScaleRef: &expapi.SubresourceReference{ + Kind: "replicationController", + Name: rcName, + Namespace: namespace, + Subresource: "scale", + }, + MinCount: 1, + MaxCount: 5, + Target: expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.3")}, + }, + Status: expapi.HorizontalPodAutoscalerStatus{ + CurrentReplicas: 1, + DesiredReplicas: 3, + }, + }} + + heapsterRawResponse, _ := json.Marshal(&metrics) + heapsterStrResponse := string(heapsterRawResponse) + heapsterResponse := serverResponse{http.StatusOK, &heapsterStrResponse} + + testServer, handlers := makeTestServer(t, + map[string]*serverResponse{ + hpaListHandler: &hpaResponse, + scaleHandler: &scaleResponse, + podListHandler: &podListResponse, + heapsterHandler: &heapsterResponse, + updateHpaHandler: &updateHpaResponse, + }) - testServer, handlers := makeTestServer(t, hpaResponse, scaleResponse, podListResponse, heapsterResponse) defer testServer.Close() kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) expClient := client.NewExperimentalOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -146,9 +208,18 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { hpaController := New(kubeClient, expClient) err := hpaController.reconcileAutoscalers() if err != nil { - t.Fatal("Failed to reconcile %v", err) + t.Fatal("Failed to reconcile: %v", err) } for _, h := range handlers { h.ValidateRequestCount(t, 1) } + obj, err := expClient.Codec.Decode([]byte(handlers[updateHpaHandler].RequestBody)) + if err != nil { + t.Fatal("Failed to decode: %v %v", err) + } + hpa, _ := obj.(*expapi.HorizontalPodAutoscaler) + + assert.Equal(t, 3, hpa.Status.DesiredReplicas) + assert.Equal(t, int64(650), hpa.Status.CurrentConsumption.Quantity.MilliValue()) + assert.NotNil(t, hpa.Status.LastScaleTimestamp) }