Merge pull request #18065 from jszczepkowski/hpa-fix

Fixed forbidden window enforcement in horizontal pod autoscaler.
This commit is contained in:
Marek Grabowski 2015-12-03 08:36:10 +01:00
commit f633aa67b1
3 changed files with 86 additions and 65 deletions

View File

@ -68,27 +68,27 @@ func (a *HorizontalController) Run(syncPeriod time.Duration) {
}, syncPeriod, util.NeverStop)
}
func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale) (int, *int, error) {
func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale) (int, *int, time.Time, error) {
if hpa.Spec.CPUUtilization == nil {
// If CPUTarget is not specified than we should return some default values.
// Since we always take maximum number of replicas from all policies it is safe
// to just return 0.
return 0, nil, nil
return 0, nil, time.Time{}, nil
}
currentReplicas := scale.Status.Replicas
currentUtilization, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, scale.Status.Selector)
currentUtilization, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, scale.Status.Selector)
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
if err != nil {
a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedGetMetrics", err.Error())
return 0, nil, fmt.Errorf("failed to get cpu utilization: %v", err)
return 0, nil, time.Time{}, fmt.Errorf("failed to get cpu utilization: %v", err)
}
usageRatio := float64(*currentUtilization) / float64(hpa.Spec.CPUUtilization.TargetPercentage)
if math.Abs(1.0-usageRatio) > tolerance {
return int(math.Ceil(usageRatio * float64(currentReplicas))), currentUtilization, nil
return int(math.Ceil(usageRatio * float64(currentReplicas))), currentUtilization, timestamp, nil
} else {
return currentReplicas, currentUtilization, nil
return currentReplicas, currentUtilization, timestamp, nil
}
}
@ -102,7 +102,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
}
currentReplicas := scale.Status.Replicas
desiredReplicas, currentUtilization, err := a.computeReplicasForCPUUtilization(hpa, scale)
desiredReplicas, currentUtilization, timestamp, err := a.computeReplicasForCPUUtilization(hpa, scale)
if err != nil {
a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err)
@ -120,7 +120,6 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
if desiredReplicas > hpa.Spec.MaxReplicas {
desiredReplicas = hpa.Spec.MaxReplicas
}
now := time.Now()
rescale := false
if desiredReplicas != currentReplicas {
@ -128,7 +127,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
// and there was no rescaling in the last downscaleForbiddenWindow.
if desiredReplicas < currentReplicas &&
(hpa.Status.LastScaleTime == nil ||
hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(now)) {
hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(timestamp)) {
rescale = true
}
@ -136,7 +135,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
// and there was no rescaling in the last upscaleForbiddenWindow.
if desiredReplicas > currentReplicas &&
(hpa.Status.LastScaleTime == nil ||
hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(now)) {
hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(timestamp)) {
rescale = true
}
}
@ -162,7 +161,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
LastScaleTime: hpa.Status.LastScaleTime,
}
if rescale {
now := unversioned.NewTime(now)
now := unversioned.NewTime(time.Now())
hpa.Status.LastScaleTime = &now
}

View File

@ -44,10 +44,10 @@ var heapsterQueryStart = -5 * time.Minute
// MetricsClient is an interface for getting metrics for pods.
type MetricsClient interface {
// GetCPUUtilization returns average utilization over all pods
// represented as a percent of requested CPU, e.g. 70 means that
// an average pod uses 70% of the requested CPU.
GetCPUUtilization(namespace string, selector map[string]string) (*int, error)
// GetCPUUtilization returns the average utilization over all pods represented as a percent of requested CPU
// (e.g. 70 means that an average pod uses 70% of the requested CPU)
// and the time of generation of the oldest of utilization reports for pods.
GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error)
}
// ResourceConsumption specifies consumption of a particular resource.
@ -56,9 +56,8 @@ type ResourceConsumption struct {
Quantity resource.Quantity
}
// Aggregates results into ResourceConsumption. Also returns number of
// pods included in the aggregation.
type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int)
// Aggregates results into ResourceConsumption. Also returns number of pods included in the aggregation.
type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int, time.Time)
type metricDefinition struct {
name string
@ -77,23 +76,23 @@ type HeapsterMetricsClient struct {
var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
api.ResourceCPU: {"cpu-usage",
func(metrics heapster.MetricResultList) (ResourceConsumption, int) {
sum, count := calculateSumFromLatestSample(metrics)
func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) {
sum, count, timestamp := calculateSumFromLatestSample(metrics)
value := "0"
if count > 0 {
// assumes that cpu usage is in millis
value = fmt.Sprintf("%dm", sum/uint64(count))
}
return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count
return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count, timestamp
}},
api.ResourceMemory: {"memory-usage",
func(metrics heapster.MetricResultList) (ResourceConsumption, int) {
sum, count := calculateSumFromLatestSample(metrics)
func(metrics heapster.MetricResultList) (ResourceConsumption, int, time.Time) {
sum, count, timestamp := calculateSumFromLatestSample(metrics)
value := int64(0)
if count > 0 {
value = int64(sum) / int64(count)
}
return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count
return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count, timestamp
}},
}
@ -109,22 +108,22 @@ func NewHeapsterMetricsClient(client client.Interface, namespace, scheme, servic
}
}
func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, error) {
consumption, request, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector)
func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, time.Time, error) {
consumption, request, timestamp, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector)
if err != nil {
return nil, fmt.Errorf("failed to get CPU consumption and request: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err)
}
utilization := new(int)
*utilization = int(float64(consumption.Quantity.MilliValue()) / float64(request.MilliValue()) * 100)
return utilization, nil
return utilization, timestamp, nil
}
func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, err error) {
func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, timestamp time.Time, err error) {
podList, err := h.client.Pods(namespace).
List(labels.SelectorFromSet(labels.Set(selector)), fields.Everything(), unversioned.ListOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to get pod list: %v", err)
return nil, nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
}
podNames := []string{}
sum := resource.MustParse("0")
@ -141,22 +140,22 @@ func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName ap
}
}
if missing || sum.Cmp(resource.MustParse("0")) == 0 {
return nil, nil, fmt.Errorf("some pods do not have request for %s", resourceName)
return nil, nil, time.Time{}, fmt.Errorf("some pods do not have request for %s", resourceName)
}
glog.Infof("Sum of %s requested: %v", resourceName, sum)
avg := resource.MustParse(fmt.Sprintf("%dm", sum.MilliValue()/int64(len(podList.Items))))
request = &avg
consumption, err = h.getForPods(resourceName, namespace, podNames)
consumption, timestamp, err = h.getForPods(resourceName, namespace, podNames)
if err != nil {
return nil, nil, err
return nil, nil, time.Time{}, err
}
return consumption, request, nil
return consumption, request, timestamp, nil
}
func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, error) {
func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, time.Time, error) {
metricSpec, metricDefined := h.resourceDefinitions[resourceName]
if !metricDefined {
return nil, fmt.Errorf("heapster metric not defined for %v", resourceName)
return nil, time.Time{}, fmt.Errorf("heapster metric not defined for %v", resourceName)
}
now := time.Now()
@ -171,30 +170,33 @@ func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namesp
DoRaw()
if err != nil {
return nil, fmt.Errorf("failed to get pods metrics: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err)
}
var metrics heapster.MetricResultList
err = json.Unmarshal(resultRaw, &metrics)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err)
return nil, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
}
glog.Infof("Metrics available: %s", string(resultRaw))
currentConsumption, count := metricSpec.aggregator(metrics)
currentConsumption, count, timestamp := metricSpec.aggregator(metrics)
if count != len(podNames) {
return nil, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames))
return nil, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames))
}
return &currentConsumption, nil
return &currentConsumption, timestamp, nil
}
func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) {
sum := uint64(0)
count := 0
func calculateSumFromLatestSample(metrics heapster.MetricResultList) (sum uint64, count int, timestamp time.Time) {
sum = uint64(0)
count = 0
timestamp = time.Time{}
var oldest *time.Time // creation time of the oldest of used samples across pods
oldest = nil
for _, metrics := range metrics.Items {
var newest *heapster.MetricPoint
var newest *heapster.MetricPoint // creation time of the newest sample for pod
newest = nil
for i, metricPoint := range metrics.Metrics {
if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
@ -202,9 +204,15 @@ func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, in
}
}
if newest != nil {
if oldest == nil || newest.Timestamp.Before(*oldest) {
oldest = &newest.Timestamp
}
sum += newest.Value
count++
}
}
return sum, count
if oldest != nil {
timestamp = *oldest
}
return sum, count, timestamp
}

View File

@ -35,6 +35,8 @@ import (
"github.com/stretchr/testify/assert"
)
var fixedTimestamp = time.Date(2015, time.November, 10, 12, 30, 0, 0, time.UTC)
func (w fakeResponseWrapper) DoRaw() ([]byte, error) {
return w.raw, nil
}
@ -62,6 +64,7 @@ type testCase struct {
desiredValue int64
desiredError error
targetResource api.ResourceName
targetTimestamp int
reportedMetricsPoints [][]metricPoint
namespace string
selector map[string]string
@ -108,12 +111,11 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
fakeClient.AddProxyReactor("services", func(action testclient.Action) (handled bool, ret client.ResponseWrapper, err error) {
metrics := heapster.MetricResultList{}
firstTimestamp := time.Now()
var latestTimestamp time.Time
for _, reportedMetricPoints := range tc.reportedMetricsPoints {
var heapsterMetricPoints []heapster.MetricPoint
for _, reportedMetricPoint := range reportedMetricPoints {
timestamp := firstTimestamp.Add(time.Duration(reportedMetricPoint.timestamp) * time.Minute)
timestamp := fixedTimestamp.Add(time.Duration(reportedMetricPoint.timestamp) * time.Minute)
if latestTimestamp.Before(timestamp) {
latestTimestamp = timestamp
}
@ -133,7 +135,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
return fakeClient
}
func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, err error) {
func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, timestamp time.Time, err error) {
assert.Equal(t, tc.desiredError, err)
if tc.desiredError != nil {
return
@ -144,13 +146,15 @@ func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, err er
if tc.targetResource == api.ResourceMemory {
assert.Equal(t, tc.desiredValue, val.Quantity.Value())
}
targetTimestamp := fixedTimestamp.Add(time.Duration(tc.targetTimestamp) * time.Minute)
assert.Equal(t, targetTimestamp, timestamp)
}
func (tc *testCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort)
val, _, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector)
tc.verifyResults(t, val, err)
val, _, timestamp, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector)
tc.verifyResults(t, val, timestamp, err)
}
func TestCPU(t *testing.T) {
@ -158,6 +162,7 @@ func TestCPU(t *testing.T) {
replicas: 3,
desiredValue: 5000,
targetResource: api.ResourceCPU,
targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 1}}, {{5000, 1}}},
}
tc.runTest(t)
@ -168,6 +173,7 @@ func TestMemory(t *testing.T) {
replicas: 3,
desiredValue: 5000,
targetResource: api.ResourceMemory,
targetTimestamp: 1,
reportedMetricsPoints: [][]metricPoint{{{5000, 1}}, {{5000, 2}}, {{5000, 4}}},
}
tc.runTest(t)
@ -178,6 +184,7 @@ func TestCPUSumEqualZero(t *testing.T) {
replicas: 3,
desiredValue: 0,
targetResource: api.ResourceCPU,
targetTimestamp: 0,
reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}},
}
tc.runTest(t)
@ -188,6 +195,7 @@ func TestMemorySumEqualZero(t *testing.T) {
replicas: 3,
desiredValue: 0,
targetResource: api.ResourceMemory,
targetTimestamp: 0,
reportedMetricsPoints: [][]metricPoint{{{0, 0}}, {{0, 0}}, {{0, 0}}},
}
tc.runTest(t)
@ -195,9 +203,10 @@ func TestMemorySumEqualZero(t *testing.T) {
func TestCPUMoreMetrics(t *testing.T) {
tc := testCase{
replicas: 5,
desiredValue: 5000,
targetResource: api.ResourceCPU,
replicas: 5,
desiredValue: 5000,
targetResource: api.ResourceCPU,
targetTimestamp: 10,
reportedMetricsPoints: [][]metricPoint{
{{0, 3}, {0, 6}, {5, 4}, {9000, 10}},
{{5000, 2}, {10, 5}, {66, 1}, {0, 10}},
@ -210,9 +219,10 @@ func TestCPUMoreMetrics(t *testing.T) {
func TestMemoryMoreMetrics(t *testing.T) {
tc := testCase{
replicas: 5,
desiredValue: 5000,
targetResource: api.ResourceMemory,
replicas: 5,
desiredValue: 5000,
targetResource: api.ResourceMemory,
targetTimestamp: 10,
reportedMetricsPoints: [][]metricPoint{
{{0, 3}, {0, 6}, {5, 4}, {9000, 10}},
{{5000, 2}, {10, 5}, {66, 1}, {0, 10}},
@ -228,6 +238,7 @@ func TestCPUResultIsFloat(t *testing.T) {
replicas: 6,
desiredValue: 4783,
targetResource: api.ResourceCPU,
targetTimestamp: 4,
reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}},
}
tc.runTest(t)
@ -238,6 +249,7 @@ func TestMemoryResultIsFloat(t *testing.T) {
replicas: 6,
desiredValue: 4783,
targetResource: api.ResourceMemory,
targetTimestamp: 4,
reportedMetricsPoints: [][]metricPoint{{{4000, 4}}, {{9500, 4}}, {{3000, 4}}, {{7000, 4}}, {{3200, 4}}, {{2000, 4}}},
}
tc.runTest(t)
@ -245,22 +257,24 @@ func TestMemoryResultIsFloat(t *testing.T) {
func TestCPUSamplesWithRandomTimestamps(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 3000,
targetResource: api.ResourceCPU,
replicas: 3,
desiredValue: 3000,
targetResource: api.ResourceCPU,
targetTimestamp: 3,
reportedMetricsPoints: [][]metricPoint{
{{1, 1}, {3000, 3}, {2, 2}},
{{1, 1}, {3000, 5}, {2, 2}},
{{2, 2}, {1, 1}, {3000, 3}},
{{3000, 3}, {1, 1}, {2, 2}}},
{{3000, 4}, {1, 1}, {2, 2}}},
}
tc.runTest(t)
}
func TestMemorySamplesWithRandomTimestamps(t *testing.T) {
tc := testCase{
replicas: 3,
desiredValue: 3000,
targetResource: api.ResourceMemory,
replicas: 3,
desiredValue: 3000,
targetResource: api.ResourceMemory,
targetTimestamp: 3,
reportedMetricsPoints: [][]metricPoint{
{{1, 1}, {3000, 3}, {2, 2}},
{{2, 2}, {1, 1}, {3000, 3}},