HPA Controller: Use Custom Metrics API

This commit switches over the HPA controller to use the custom metrics
API.  It also converts the HPA controller to use the generated client
in k8s.io/metrics for the resource metrics API.

In order to enable support, you must enable
`--horizontal-pod-autoscaler-use-rest-clients` on the
controller-manager, which will switch the HPA controller's MetricsClient
implementation over to use the standard rest clients for both custom
metrics and resource metrics.  This requires that at the least resource
metrics API is registered with kube-aggregator, and that the controller
manager is pointed at kube-aggregator.  For this to work, Heapster
must be serving the new-style API server (`--api-server=true`).
This commit is contained in:
Solly Ross
2017-02-20 01:17:16 -05:00
parent 2249550b57
commit d6fe1e8764
16 changed files with 2630 additions and 205 deletions

View File

@@ -17,12 +17,8 @@ limitations under the License.
package podautoscaler
import (
"encoding/json"
"fmt"
"io"
"math"
"strconv"
"strings"
"sync"
"testing"
"time"
@@ -30,12 +26,12 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
clientfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/pkg/api"
clientv1 "k8s.io/client-go/pkg/api/v1"
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
autoscalingv2 "k8s.io/kubernetes/pkg/apis/autoscaling/v2alpha1"
@@ -44,9 +40,11 @@ import (
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake"
cmfake "k8s.io/metrics/pkg/client/custom_metrics/fake"
heapster "k8s.io/heapster/metrics/api/v1/types"
metricsapi "k8s.io/heapster/metrics/apis/metrics/v1alpha1"
cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1alpha1"
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
"github.com/stretchr/testify/assert"
@@ -56,22 +54,6 @@ import (
func alwaysReady() bool { return true }
func (w fakeResponseWrapper) DoRaw() ([]byte, error) {
return w.raw, nil
}
func (w fakeResponseWrapper) Stream() (io.ReadCloser, error) {
return nil, nil
}
func newFakeResponseWrapper(raw []byte) fakeResponseWrapper {
return fakeResponseWrapper{raw: raw}
}
type fakeResponseWrapper struct {
raw []byte
}
type fakeResource struct {
name string
apiVersion string
@@ -124,7 +106,7 @@ func (tc *testCase) computeCPUCurrent() {
tc.CPUCurrent = int32(100 * reported / requested)
}
func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
func (tc *testCase) prepareTestClient(t *testing.T) (*fake.Clientset, *metricsfake.Clientset, *cmfake.FakeCustomMetricsClient) {
namespace := "test-namespace"
hpaName := "test-hpa"
podNamePrefix := "test-pod"
@@ -323,79 +305,6 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
return true, obj, nil
})
fakeClient.AddProxyReactor("services", func(action core.Action) (handled bool, ret restclient.ResponseWrapper, err error) {
tc.Lock()
defer tc.Unlock()
var heapsterRawMemResponse []byte
if tc.useMetricsApi {
metrics := metricsapi.PodMetricsList{}
for i, cpu := range tc.reportedLevels {
podMetric := metricsapi.PodMetrics{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
Namespace: namespace,
},
Timestamp: unversioned.Time{Time: time.Now()},
Containers: []metricsapi.ContainerMetrics{
{
Name: "container",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(cpu),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(1024*1024),
resource.BinarySI),
},
},
},
}
metrics.Items = append(metrics.Items, podMetric)
}
heapsterRawMemResponse, _ = json.Marshal(&metrics)
} else {
// only return the pods that we actually asked for
proxyAction := action.(core.ProxyGetAction)
pathParts := strings.Split(proxyAction.GetPath(), "/")
// pathParts should look like [ api, v1, model, namespaces, $NS, pod-list, $PODS, metrics, $METRIC... ]
if len(pathParts) < 9 {
return true, nil, fmt.Errorf("invalid heapster path %q", proxyAction.GetPath())
}
podNames := strings.Split(pathParts[7], ",")
podPresent := make([]bool, len(tc.reportedLevels))
for _, name := range podNames {
if len(name) <= len(podNamePrefix)+1 {
return true, nil, fmt.Errorf("unknown pod %q", name)
}
num, err := strconv.Atoi(name[len(podNamePrefix)+1:])
if err != nil {
return true, nil, fmt.Errorf("unknown pod %q", name)
}
podPresent[num] = true
}
timestamp := time.Now()
metrics := heapster.MetricResultList{}
for i, level := range tc.reportedLevels {
if !podPresent[i] {
continue
}
metric := heapster.MetricResult{
Metrics: []heapster.MetricPoint{{Timestamp: timestamp, Value: level, FloatValue: nil}},
LatestTimestamp: timestamp,
}
metrics.Items = append(metrics.Items, metric)
}
heapsterRawMemResponse, _ = json.Marshal(&metrics)
}
return true, newFakeResponseWrapper(heapsterRawMemResponse), nil
})
fakeClient.AddReactor("update", "replicationcontrollers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
@@ -450,7 +359,116 @@ func (tc *testCase) prepareTestClient(t *testing.T) *fake.Clientset {
fakeWatch := watch.NewFake()
fakeClient.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
return fakeClient
fakeMetricsClient := &metricsfake.Clientset{}
// NB: we have to sound like Gollum due to gengo's inability to handle already-plural resource names
fakeMetricsClient.AddReactor("list", "podmetricses", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
metrics := &metricsapi.PodMetricsList{}
for i, cpu := range tc.reportedLevels {
// NB: the list reactor actually does label selector filtering for us,
// so we have to make sure our results match the label selector
podMetric := metricsapi.PodMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
Namespace: namespace,
Labels: selector,
},
Timestamp: metav1.Time{Time: time.Now()},
Containers: []metricsapi.ContainerMetrics{
{
Name: "container",
Usage: clientv1.ResourceList{
clientv1.ResourceCPU: *resource.NewMilliQuantity(
int64(cpu),
resource.DecimalSI),
clientv1.ResourceMemory: *resource.NewQuantity(
int64(1024*1024),
resource.BinarySI),
},
},
},
}
metrics.Items = append(metrics.Items, podMetric)
}
return true, metrics, nil
})
fakeCMClient := &cmfake.FakeCustomMetricsClient{}
fakeCMClient.AddReactor("get", "*", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()
getForAction, wasGetFor := action.(cmfake.GetForAction)
if !wasGetFor {
return true, nil, fmt.Errorf("expected a get-for action, got %v instead", action)
}
if getForAction.GetName() == "*" {
metrics := &cmapi.MetricValueList{}
// multiple objects
assert.Equal(t, "pods", getForAction.GetResource().Resource, "the type of object that we requested multiple metrics for should have been pods")
assert.Equal(t, "qps", getForAction.GetMetricName(), "the metric name requested should have been qps, as specified in the metric spec")
for i, level := range tc.reportedLevels {
podMetric := cmapi.MetricValue{
DescribedObject: clientv1.ObjectReference{
Kind: "Pod",
Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
Namespace: namespace,
},
Timestamp: metav1.Time{Time: time.Now()},
MetricName: "qps",
Value: *resource.NewMilliQuantity(int64(level), resource.DecimalSI),
}
metrics.Items = append(metrics.Items, podMetric)
}
return true, metrics, nil
} else {
name := getForAction.GetName()
mapper := api.Registry.RESTMapper()
metrics := &cmapi.MetricValueList{}
var matchedTarget *autoscalingv2.MetricSpec
for i, target := range tc.metricsTarget {
if target.Type == autoscalingv2.ObjectMetricSourceType && name == target.Object.Target.Name {
gk := schema.FromAPIVersionAndKind(target.Object.Target.APIVersion, target.Object.Target.Kind).GroupKind()
mapping, err := mapper.RESTMapping(gk)
if err != nil {
t.Logf("unable to get mapping for %s: %v", gk.String(), err)
continue
}
groupResource := schema.GroupResource{Group: mapping.GroupVersionKind.Group, Resource: mapping.Resource}
if getForAction.GetResource().Resource == groupResource.String() {
matchedTarget = &tc.metricsTarget[i]
}
}
}
assert.NotNil(t, matchedTarget, "this request should have matched one of the metric specs")
assert.Equal(t, "qps", getForAction.GetMetricName(), "the metric name requested should have been qps, as specified in the metric spec")
metrics.Items = []cmapi.MetricValue{
{
DescribedObject: clientv1.ObjectReference{
Kind: matchedTarget.Object.Target.Kind,
APIVersion: matchedTarget.Object.Target.APIVersion,
Name: name,
},
Timestamp: metav1.Time{Time: time.Now()},
MetricName: "qps",
Value: *resource.NewMilliQuantity(int64(tc.reportedLevels[0]), resource.DecimalSI),
},
}
return true, metrics, nil
}
})
return fakeClient, fakeMetricsClient, fakeCMClient
}
func (tc *testCase) verifyResults(t *testing.T) {
@@ -465,8 +483,11 @@ func (tc *testCase) verifyResults(t *testing.T) {
}
func (tc *testCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
testClient, testMetricsClient, testCMClient := tc.prepareTestClient(t)
metricsClient := metrics.NewRESTMetricsClient(
testMetricsClient.MetricsV1alpha1(),
testCMClient,
)
eventClient := &clientfake.Clientset{}
eventClient.AddReactor("*", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) {
@@ -631,7 +652,7 @@ func TestScaleUpCM(t *testing.T) {
},
},
},
reportedLevels: []uint64{20, 10, 30},
reportedLevels: []uint64{20000, 10000, 30000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
}
tc.runTest(t)
@@ -653,7 +674,7 @@ func TestScaleUpCMUnreadyLessScale(t *testing.T) {
},
},
},
reportedLevels: []uint64{50, 10, 30},
reportedLevels: []uint64{50000, 10000, 30000},
reportedPodReadiness: []v1.ConditionStatus{v1.ConditionTrue, v1.ConditionTrue, v1.ConditionFalse},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
}
@@ -676,13 +697,39 @@ func TestScaleUpCMUnreadyNoScaleWouldScaleDown(t *testing.T) {
},
},
},
reportedLevels: []uint64{50, 15, 30},
reportedLevels: []uint64{50000, 15000, 30000},
reportedPodReadiness: []v1.ConditionStatus{v1.ConditionFalse, v1.ConditionTrue, v1.ConditionFalse},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
}
tc.runTest(t)
}
func TestScaleUpCMObject(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 3,
desiredReplicas: 4,
CPUTarget: 0,
metricsTarget: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.ObjectMetricSourceType,
Object: &autoscalingv2.ObjectMetricSource{
Target: autoscalingv2.CrossVersionObjectReference{
APIVersion: "extensions/v1beta1",
Kind: "Deployment",
Name: "some-deployment",
},
MetricName: "qps",
TargetValue: resource.MustParse("15.0"),
},
},
},
reportedLevels: []uint64{20000},
}
tc.runTest(t)
}
func TestScaleDown(t *testing.T) {
tc := testCase{
minReplicas: 2,
@@ -714,7 +761,34 @@ func TestScaleDownCM(t *testing.T) {
},
},
},
reportedLevels: []uint64{12, 12, 12, 12, 12},
reportedLevels: []uint64{12000, 12000, 12000, 12000, 12000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
}
tc.runTest(t)
}
func TestScaleDownCMObject(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 5,
desiredReplicas: 3,
CPUTarget: 0,
metricsTarget: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.ObjectMetricSourceType,
Object: &autoscalingv2.ObjectMetricSource{
Target: autoscalingv2.CrossVersionObjectReference{
APIVersion: "extensions/v1beta1",
Kind: "Deployment",
Name: "some-deployment",
},
MetricName: "qps",
TargetValue: resource.MustParse("20.0"),
},
},
},
reportedLevels: []uint64{12000},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
}
tc.runTest(t)
@@ -766,7 +840,33 @@ func TestToleranceCM(t *testing.T) {
},
},
},
reportedLevels: []uint64{20, 21, 21},
reportedLevels: []uint64{20000, 20001, 21000},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
}
tc.runTest(t)
}
func TestToleranceCMObject(t *testing.T) {
tc := testCase{
minReplicas: 1,
maxReplicas: 5,
initialReplicas: 3,
desiredReplicas: 3,
metricsTarget: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.ObjectMetricSourceType,
Object: &autoscalingv2.ObjectMetricSource{
Target: autoscalingv2.CrossVersionObjectReference{
APIVersion: "extensions/v1beta1",
Kind: "Deployment",
Name: "some-deployment",
},
MetricName: "qps",
TargetValue: resource.MustParse("20.0"),
},
},
},
reportedLevels: []uint64{20050},
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
}
tc.runTest(t)