mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-06 07:57:35 +00:00
HorizontalPodAutoscaler API: removal of ResourceConsumption target, introduction of CPU request utilization & other cleanups.
This commit is contained in:
committed by
Filip Grzadkowski
parent
3e5f792f69
commit
df732f061a
@@ -68,6 +68,31 @@ func (a *HorizontalController) Run(syncPeriod time.Duration) {
|
||||
}, syncPeriod, util.NeverStop)
|
||||
}
|
||||
|
||||
func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler,
|
||||
scale *extensions.Scale) (int, *int, error) {
|
||||
if hpa.Spec.CPUUtilization == nil {
|
||||
// If CPUTarget is not specified than we should return some default values.
|
||||
// Since we always take maximum number of replicas from all policies it is safe
|
||||
// to just return 0.
|
||||
return 0, nil, nil
|
||||
}
|
||||
currentReplicas := scale.Status.Replicas
|
||||
currentUtilization, err := a.metricsClient.GetCPUUtilization(hpa.Spec.ScaleRef.Namespace, scale.Status.Selector)
|
||||
|
||||
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
|
||||
if err != nil {
|
||||
a.eventRecorder.Event(&hpa, "FailedGetMetrics", err.Error())
|
||||
return 0, nil, fmt.Errorf("failed to get cpu utilization: %v", err)
|
||||
}
|
||||
|
||||
usageRatio := float64(*currentUtilization) / float64(hpa.Spec.CPUUtilization.TargetPercentage)
|
||||
if math.Abs(1.0-usageRatio) > tolerance {
|
||||
return int(math.Ceil(usageRatio * float64(currentReplicas))), currentUtilization, nil
|
||||
} else {
|
||||
return currentReplicas, currentUtilization, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodAutoscaler) error {
|
||||
reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Namespace, hpa.Spec.ScaleRef.Name)
|
||||
|
||||
@@ -77,24 +102,18 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
|
||||
return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
|
||||
}
|
||||
currentReplicas := scale.Status.Replicas
|
||||
currentConsumption, err := a.metricsClient.
|
||||
ResourceConsumption(hpa.Spec.ScaleRef.Namespace).
|
||||
Get(hpa.Spec.Target.Resource, scale.Status.Selector)
|
||||
|
||||
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
|
||||
desiredReplicas, currentUtilization, err := a.computeReplicasForCPUUtilization(hpa, scale)
|
||||
if err != nil {
|
||||
a.eventRecorder.Event(&hpa, "FailedGetMetrics", err.Error())
|
||||
return fmt.Errorf("failed to get metrics for %s: %v", reference, err)
|
||||
a.eventRecorder.Event(&hpa, "FailedComputeReplicas", err.Error())
|
||||
return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err)
|
||||
}
|
||||
|
||||
usageRatio := float64(currentConsumption.Quantity.MilliValue()) / float64(hpa.Spec.Target.Quantity.MilliValue())
|
||||
desiredReplicas := int(math.Ceil(usageRatio * float64(currentReplicas)))
|
||||
|
||||
if desiredReplicas < hpa.Spec.MinReplicas {
|
||||
desiredReplicas = hpa.Spec.MinReplicas
|
||||
if hpa.Spec.MinReplicas != nil && desiredReplicas < *hpa.Spec.MinReplicas {
|
||||
desiredReplicas = *hpa.Spec.MinReplicas
|
||||
}
|
||||
|
||||
// TODO: remove when pod ideling is done.
|
||||
// TODO: remove when pod idling is done.
|
||||
if desiredReplicas == 0 {
|
||||
desiredReplicas = 1
|
||||
}
|
||||
@@ -108,17 +127,17 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
|
||||
if desiredReplicas != currentReplicas {
|
||||
// Going down only if the usageRatio dropped significantly below the target
|
||||
// and there was no rescaling in the last downscaleForbiddenWindow.
|
||||
if desiredReplicas < currentReplicas && usageRatio < (1-tolerance) &&
|
||||
(hpa.Status.LastScaleTimestamp == nil ||
|
||||
hpa.Status.LastScaleTimestamp.Add(downscaleForbiddenWindow).Before(now)) {
|
||||
if desiredReplicas < currentReplicas &&
|
||||
(hpa.Status.LastScaleTime == nil ||
|
||||
hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(now)) {
|
||||
rescale = true
|
||||
}
|
||||
|
||||
// Going up only if the usage ratio increased significantly above the target
|
||||
// and there was no rescaling in the last upscaleForbiddenWindow.
|
||||
if desiredReplicas > currentReplicas && usageRatio > (1+tolerance) &&
|
||||
(hpa.Status.LastScaleTimestamp == nil ||
|
||||
hpa.Status.LastScaleTimestamp.Add(upscaleForbiddenWindow).Before(now)) {
|
||||
if desiredReplicas > currentReplicas &&
|
||||
(hpa.Status.LastScaleTime == nil ||
|
||||
hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(now)) {
|
||||
rescale = true
|
||||
}
|
||||
}
|
||||
@@ -131,20 +150,20 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
|
||||
return fmt.Errorf("failed to rescale %s: %v", reference, err)
|
||||
}
|
||||
a.eventRecorder.Eventf(&hpa, "SuccessfulRescale", "New size: %d", desiredReplicas)
|
||||
glog.Infof("Successfull rescale of %s, old size: %d, new size: %d, usage ratio: %f",
|
||||
hpa.Name, currentReplicas, desiredReplicas, usageRatio)
|
||||
glog.Infof("Successfull rescale of %s, old size: %d, new size: %d",
|
||||
hpa.Name, currentReplicas, desiredReplicas)
|
||||
} else {
|
||||
desiredReplicas = currentReplicas
|
||||
}
|
||||
|
||||
hpa.Status = extensions.HorizontalPodAutoscalerStatus{
|
||||
CurrentReplicas: currentReplicas,
|
||||
DesiredReplicas: desiredReplicas,
|
||||
CurrentConsumption: currentConsumption,
|
||||
CurrentReplicas: currentReplicas,
|
||||
DesiredReplicas: desiredReplicas,
|
||||
CurrentCPUUtilizationPercentage: currentUtilization,
|
||||
}
|
||||
if rescale {
|
||||
now := unversioned.NewTime(now)
|
||||
hpa.Status.LastScaleTimestamp = &now
|
||||
hpa.Status.LastScaleTime = &now
|
||||
}
|
||||
|
||||
_, err = a.client.Extensions().HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(&hpa)
|
||||
|
||||
@@ -58,12 +58,13 @@ type testCase struct {
|
||||
maxReplicas int
|
||||
initialReplicas int
|
||||
desiredReplicas int
|
||||
targetResource api.ResourceName
|
||||
targetLevel resource.Quantity
|
||||
reportedLevels []uint64
|
||||
scaleUpdated bool
|
||||
eventCreated bool
|
||||
verifyEvents bool
|
||||
// CPU target utilization as a percentage of the requested resources.
|
||||
CPUTarget int
|
||||
reportedLevels []uint64
|
||||
reportedCPURequests []resource.Quantity
|
||||
scaleUpdated bool
|
||||
eventCreated bool
|
||||
verifyEvents bool
|
||||
}
|
||||
|
||||
func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
|
||||
@@ -86,19 +87,21 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
|
||||
SelfLink: "experimental/v1/namespaces/" + namespace + "/horizontalpodautoscalers/" + hpaName,
|
||||
},
|
||||
Spec: extensions.HorizontalPodAutoscalerSpec{
|
||||
ScaleRef: &extensions.SubresourceReference{
|
||||
ScaleRef: extensions.SubresourceReference{
|
||||
Kind: "replicationController",
|
||||
Name: rcName,
|
||||
Namespace: namespace,
|
||||
Subresource: "scale",
|
||||
},
|
||||
MinReplicas: tc.minReplicas,
|
||||
MinReplicas: &tc.minReplicas,
|
||||
MaxReplicas: tc.maxReplicas,
|
||||
Target: extensions.ResourceConsumption{Resource: tc.targetResource, Quantity: tc.targetLevel},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if tc.CPUTarget > 0.0 {
|
||||
obj.Items[0].Spec.CPUUtilization = &extensions.CPUTargetUtilization{TargetPercentage: tc.CPUTarget}
|
||||
}
|
||||
return true, obj, nil
|
||||
})
|
||||
|
||||
@@ -121,7 +124,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
|
||||
|
||||
fakeClient.AddReactor("list", "pods", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
|
||||
obj := &api.PodList{}
|
||||
for i := 0; i < tc.initialReplicas; i++ {
|
||||
for i := 0; i < len(tc.reportedCPURequests); i++ {
|
||||
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
|
||||
pod := api.Pod{
|
||||
Status: api.PodStatus{
|
||||
@@ -134,6 +137,17 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
|
||||
"name": podNamePrefix,
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Resources: api.ResourceRequirements{
|
||||
Requests: api.ResourceList{
|
||||
api.ResourceCPU: tc.reportedCPURequests[i],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
obj.Items = append(obj.Items, pod)
|
||||
}
|
||||
@@ -202,160 +216,148 @@ func (tc *testCase) runTest(t *testing.T) {
|
||||
tc.verifyResults(t)
|
||||
}
|
||||
|
||||
func TestCPU(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 1,
|
||||
desiredReplicas: 2,
|
||||
targetResource: api.ResourceCPU,
|
||||
targetLevel: resource.MustParse("0.1"),
|
||||
reportedLevels: []uint64{200},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestMemory(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 1,
|
||||
desiredReplicas: 2,
|
||||
targetResource: api.ResourceMemory,
|
||||
targetLevel: resource.MustParse("1k"),
|
||||
reportedLevels: []uint64{2000},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestScaleUp(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 5,
|
||||
targetResource: api.ResourceMemory,
|
||||
targetLevel: resource.MustParse("3k"),
|
||||
reportedLevels: []uint64{3000, 5000, 7000},
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 5,
|
||||
CPUTarget: 30,
|
||||
reportedLevels: []uint64{300, 500, 700},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestScaleDown(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 5,
|
||||
desiredReplicas: 3,
|
||||
targetResource: api.ResourceCPU,
|
||||
targetLevel: resource.MustParse("0.5"),
|
||||
reportedLevels: []uint64{100, 300, 500, 250, 250},
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 5,
|
||||
desiredReplicas: 3,
|
||||
CPUTarget: 50,
|
||||
reportedLevels: []uint64{100, 300, 500, 250, 250},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestTolerance(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 3,
|
||||
targetResource: api.ResourceMemory,
|
||||
targetLevel: resource.MustParse("1k"),
|
||||
reportedLevels: []uint64{1010, 1030, 1020},
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 3,
|
||||
CPUTarget: 100,
|
||||
reportedLevels: []uint64{1010, 1030, 1020},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestMinReplicas(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 2,
|
||||
targetResource: api.ResourceMemory,
|
||||
targetLevel: resource.MustParse("1k"),
|
||||
reportedLevels: []uint64{10, 95, 10},
|
||||
minReplicas: 2,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 2,
|
||||
CPUTarget: 90,
|
||||
reportedLevels: []uint64{10, 95, 10},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestMaxReplicas(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 5,
|
||||
targetResource: api.ResourceMemory,
|
||||
targetLevel: resource.MustParse("1k"),
|
||||
reportedLevels: []uint64{8000, 9500, 1000},
|
||||
minReplicas: 2,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 5,
|
||||
CPUTarget: 90,
|
||||
reportedLevels: []uint64{8000, 9500, 1000},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestSuperfluousMetrics(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 4,
|
||||
desiredReplicas: 4,
|
||||
targetResource: api.ResourceMemory,
|
||||
targetLevel: resource.MustParse("1k"),
|
||||
reportedLevels: []uint64{4000, 9500, 3000, 7000, 3200, 2000},
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 4,
|
||||
desiredReplicas: 4,
|
||||
CPUTarget: 100,
|
||||
reportedLevels: []uint64{4000, 9500, 3000, 7000, 3200, 2000},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestMissingMetrics(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 4,
|
||||
desiredReplicas: 4,
|
||||
targetResource: api.ResourceMemory,
|
||||
targetLevel: resource.MustParse("1k"),
|
||||
reportedLevels: []uint64{400, 95},
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 4,
|
||||
desiredReplicas: 4,
|
||||
CPUTarget: 100,
|
||||
reportedLevels: []uint64{400, 95},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestEmptyMetrics(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 4,
|
||||
desiredReplicas: 4,
|
||||
targetResource: api.ResourceMemory,
|
||||
targetLevel: resource.MustParse("1k"),
|
||||
reportedLevels: []uint64{},
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 4,
|
||||
desiredReplicas: 4,
|
||||
CPUTarget: 100,
|
||||
reportedLevels: []uint64{},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestEmptyCPURequest(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 1,
|
||||
desiredReplicas: 1,
|
||||
CPUTarget: 100,
|
||||
reportedLevels: []uint64{200},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestEventCreated(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 1,
|
||||
desiredReplicas: 2,
|
||||
targetResource: api.ResourceCPU,
|
||||
targetLevel: resource.MustParse("0.1"),
|
||||
reportedLevels: []uint64{200},
|
||||
verifyEvents: true,
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 1,
|
||||
desiredReplicas: 2,
|
||||
CPUTarget: 50,
|
||||
reportedLevels: []uint64{200},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("0.2")},
|
||||
verifyEvents: true,
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestEventNotCreated(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 2,
|
||||
desiredReplicas: 2,
|
||||
targetResource: api.ResourceCPU,
|
||||
targetLevel: resource.MustParse("0.2"),
|
||||
reportedLevels: []uint64{200, 200},
|
||||
verifyEvents: true,
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 2,
|
||||
desiredReplicas: 2,
|
||||
CPUTarget: 50,
|
||||
reportedLevels: []uint64{200, 200},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("0.4"), resource.MustParse("0.4")},
|
||||
verifyEvents: true,
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
// TODO: add more tests
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
@@ -40,85 +39,110 @@ const (
|
||||
|
||||
var heapsterQueryStart = -5 * time.Minute
|
||||
|
||||
// An interface for getting metrics for pods.
|
||||
// MetricsClient is an interface for getting metrics for pods.
|
||||
type MetricsClient interface {
|
||||
ResourceConsumption(namespace string) ResourceConsumptionClient
|
||||
// GetCPUUtilization returns average utilization over all pods
|
||||
// represented as a percent of requested CPU, e.g. 70 means that
|
||||
// an average pod uses 70% of the requested CPU.
|
||||
GetCPUUtilization(namespace string, selector map[string]string) (*int, error)
|
||||
}
|
||||
|
||||
type ResourceConsumptionClient interface {
|
||||
// Gets average resource consumption for pods under the given selector.
|
||||
Get(resourceName api.ResourceName, selector map[string]string) (*extensions.ResourceConsumption, error)
|
||||
// ResourceConsumption specifies consumption of a particular resource.
|
||||
type ResourceConsumption struct {
|
||||
Resource api.ResourceName
|
||||
Quantity resource.Quantity
|
||||
}
|
||||
|
||||
// Aggregates results into ResourceConsumption. Also returns number of
|
||||
// pods included in the aggregation.
|
||||
type metricAggregator func(heapster.MetricResultList) (extensions.ResourceConsumption, int)
|
||||
type metricAggregator func(heapster.MetricResultList) (ResourceConsumption, int)
|
||||
|
||||
type metricDefinition struct {
|
||||
name string
|
||||
aggregator metricAggregator
|
||||
}
|
||||
|
||||
// Heapster-based implementation of MetricsClient
|
||||
// HeapsterMetricsClient is Heapster-based implementation of MetricsClient
|
||||
type HeapsterMetricsClient struct {
|
||||
client client.Interface
|
||||
}
|
||||
|
||||
type HeapsterResourceConsumptionClient struct {
|
||||
namespace string
|
||||
client client.Interface
|
||||
resourceDefinitions map[api.ResourceName]metricDefinition
|
||||
}
|
||||
|
||||
func NewHeapsterMetricsClient(client client.Interface) *HeapsterMetricsClient {
|
||||
return &HeapsterMetricsClient{client: client}
|
||||
}
|
||||
|
||||
var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
|
||||
api.ResourceCPU: {"cpu-usage",
|
||||
func(metrics heapster.MetricResultList) (extensions.ResourceConsumption, int) {
|
||||
func(metrics heapster.MetricResultList) (ResourceConsumption, int) {
|
||||
sum, count := calculateSumFromLatestSample(metrics)
|
||||
value := "0"
|
||||
if count > 0 {
|
||||
// assumes that cpu usage is in millis
|
||||
value = fmt.Sprintf("%dm", sum/uint64(count))
|
||||
}
|
||||
return extensions.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count
|
||||
return ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count
|
||||
}},
|
||||
api.ResourceMemory: {"memory-usage",
|
||||
func(metrics heapster.MetricResultList) (extensions.ResourceConsumption, int) {
|
||||
func(metrics heapster.MetricResultList) (ResourceConsumption, int) {
|
||||
sum, count := calculateSumFromLatestSample(metrics)
|
||||
value := int64(0)
|
||||
if count > 0 {
|
||||
value = int64(sum) / int64(count)
|
||||
}
|
||||
return extensions.ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count
|
||||
return ResourceConsumption{Resource: api.ResourceMemory, Quantity: *resource.NewQuantity(value, resource.DecimalSI)}, count
|
||||
}},
|
||||
}
|
||||
|
||||
func (h *HeapsterMetricsClient) ResourceConsumption(namespace string) ResourceConsumptionClient {
|
||||
return &HeapsterResourceConsumptionClient{
|
||||
namespace: namespace,
|
||||
client: h.client,
|
||||
// NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface.
|
||||
func NewHeapsterMetricsClient(client client.Interface) *HeapsterMetricsClient {
|
||||
return &HeapsterMetricsClient{
|
||||
client: client,
|
||||
resourceDefinitions: heapsterMetricDefinitions,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *HeapsterResourceConsumptionClient) Get(resourceName api.ResourceName, selector map[string]string) (*extensions.ResourceConsumption, error) {
|
||||
podList, err := h.client.Pods(h.namespace).
|
||||
func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector map[string]string) (*int, error) {
|
||||
consumption, request, err := h.GetResourceConsumptionAndRequest(api.ResourceCPU, namespace, selector)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get CPU consumption and request: %v", err)
|
||||
}
|
||||
utilization := new(int)
|
||||
*utilization = int(float64(consumption.Quantity.MilliValue()) / float64(request.MilliValue()) * 100)
|
||||
return utilization, nil
|
||||
}
|
||||
|
||||
func (h *HeapsterMetricsClient) GetResourceConsumptionAndRequest(resourceName api.ResourceName, namespace string, selector map[string]string) (consumption *ResourceConsumption, request *resource.Quantity, err error) {
|
||||
podList, err := h.client.Pods(namespace).
|
||||
List(labels.SelectorFromSet(labels.Set(selector)), fields.Everything())
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get pod list: %v", err)
|
||||
return nil, nil, fmt.Errorf("failed to get pod list: %v", err)
|
||||
}
|
||||
podNames := []string{}
|
||||
sum := resource.MustParse("0")
|
||||
missing := false
|
||||
for _, pod := range podList.Items {
|
||||
podNames = append(podNames, pod.Name)
|
||||
for _, container := range pod.Spec.Containers {
|
||||
containerRequest := container.Resources.Requests[resourceName]
|
||||
if containerRequest.Amount != nil {
|
||||
sum.Add(containerRequest)
|
||||
} else {
|
||||
missing = true
|
||||
}
|
||||
}
|
||||
}
|
||||
return h.getForPods(resourceName, podNames)
|
||||
if missing || sum.Cmp(resource.MustParse("0")) == 0 {
|
||||
return nil, nil, fmt.Errorf("some pods do not have request for %s", resourceName)
|
||||
}
|
||||
glog.Infof("Sum of %s requested: %v", resourceName, sum)
|
||||
avg := resource.MustParse(fmt.Sprintf("%dm", sum.MilliValue()/int64(len(podList.Items))))
|
||||
request = &avg
|
||||
consumption, err = h.getForPods(resourceName, namespace, podNames)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return consumption, request, nil
|
||||
}
|
||||
|
||||
func (h *HeapsterResourceConsumptionClient) getForPods(resourceName api.ResourceName, podNames []string) (*extensions.ResourceConsumption, error) {
|
||||
func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namespace string, podNames []string) (*ResourceConsumption, error) {
|
||||
metricSpec, metricDefined := h.resourceDefinitions[resourceName]
|
||||
if !metricDefined {
|
||||
return nil, fmt.Errorf("heapster metric not defined for %v", resourceName)
|
||||
@@ -127,7 +151,7 @@ func (h *HeapsterResourceConsumptionClient) getForPods(resourceName api.Resource
|
||||
|
||||
startTime := now.Add(heapsterQueryStart)
|
||||
metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s",
|
||||
h.namespace,
|
||||
namespace,
|
||||
strings.Join(podNames, ","),
|
||||
metricSpec.name)
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
_ "k8s.io/kubernetes/pkg/api/latest"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
@@ -81,14 +81,25 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
|
||||
for i := 0; i < tc.replicas; i++ {
|
||||
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
|
||||
pod := api.Pod{
|
||||
Status: api.PodStatus{
|
||||
Phase: api.PodRunning,
|
||||
},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: namespace,
|
||||
Labels: selector,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Resources: api.ResourceRequirements{
|
||||
Requests: api.ResourceList{
|
||||
tc.targetResource: resource.MustParse("10"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
Phase: api.PodRunning,
|
||||
},
|
||||
}
|
||||
obj.Items = append(obj.Items, pod)
|
||||
}
|
||||
@@ -122,7 +133,7 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
|
||||
return fakeClient
|
||||
}
|
||||
|
||||
func (tc *testCase) verifyResults(t *testing.T, val *extensions.ResourceConsumption, err error) {
|
||||
func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, err error) {
|
||||
assert.Equal(t, tc.desiredError, err)
|
||||
if tc.desiredError != nil {
|
||||
return
|
||||
@@ -138,7 +149,7 @@ func (tc *testCase) verifyResults(t *testing.T, val *extensions.ResourceConsumpt
|
||||
func (tc *testCase) runTest(t *testing.T) {
|
||||
testClient := tc.prepareTestClient(t)
|
||||
metricsClient := NewHeapsterMetricsClient(testClient)
|
||||
val, err := metricsClient.ResourceConsumption(tc.namespace).Get(tc.targetResource, tc.selector)
|
||||
val, _, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector)
|
||||
tc.verifyResults(t, val, err)
|
||||
}
|
||||
|
||||
@@ -331,7 +342,7 @@ func TestCPUZeroReplicas(t *testing.T) {
|
||||
tc := testCase{
|
||||
replicas: 0,
|
||||
targetResource: api.ResourceCPU,
|
||||
desiredValue: 0,
|
||||
desiredError: fmt.Errorf("some pods do not have request for cpu"),
|
||||
reportedMetricsPoints: [][]metricPoint{},
|
||||
}
|
||||
tc.runTest(t)
|
||||
@@ -341,7 +352,7 @@ func TestMemoryZeroReplicas(t *testing.T) {
|
||||
tc := testCase{
|
||||
replicas: 0,
|
||||
targetResource: api.ResourceMemory,
|
||||
desiredValue: 0,
|
||||
desiredError: fmt.Errorf("some pods do not have request for memory"),
|
||||
reportedMetricsPoints: [][]metricPoint{},
|
||||
}
|
||||
tc.runTest(t)
|
||||
@@ -366,3 +377,5 @@ func TestMemoryEmptyMetricsForOnePod(t *testing.T) {
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
// TODO: add proper tests for request
|
||||
|
||||
Reference in New Issue
Block a user