Merge pull request #14034 from jszczepkowski/hpa-unittest

Unittests for horizontal pod autoscaler controller.
This commit is contained in:
Piotr Szczesniak 2015-09-17 13:53:29 +02:00
commit 81d3bd9a36
5 changed files with 374 additions and 161 deletions

View File

@ -77,6 +77,5 @@ func (c *FakeServices) Watch(label labels.Selector, field fields.Selector, resou
}
func (c *FakeServices) ProxyGet(name, path string, params map[string]string) unversioned.ResponseWrapper {
c.Fake.Invokes(NewProxyGetAction("services", c.Namespace, name, path, params), nil)
return nil
return c.Fake.InvokesProxy(NewProxyGetAction("services", c.Namespace, name, path, params))
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/yaml"
"k8s.io/kubernetes/pkg/watch"
@ -260,7 +261,7 @@ func (r *SimpleReactor) React(action Action) (bool, runtime.Object, error) {
return r.Reaction(action)
}
// SimpleWatchReactor is a Reactor. Each reaction function is attached to a given verb,resource tuple. "*" in either field matches everything for that value.
// SimpleWatchReactor is a WatchReactor. Each reaction function is attached to a given resource. "*" matches everything for that value.
// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions
type SimpleWatchReactor struct {
Resource string
@ -280,3 +281,24 @@ func (r *SimpleWatchReactor) Handles(action Action) bool {
func (r *SimpleWatchReactor) React(action Action) (bool, watch.Interface, error) {
return r.Reaction(action)
}
// SimpleProxyReactor is a ProxyReactor. Each reaction function is attached to a given resource. "*" matches everything for that value.
// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions.
type SimpleProxyReactor struct {
Resource string
Reaction ProxyReactionFunc
}
func (r *SimpleProxyReactor) Handles(action Action) bool {
resourceCovers := r.Resource == "*" || r.Resource == action.GetResource()
if !resourceCovers {
return false
}
return true
}
func (r *SimpleProxyReactor) React(action Action) (bool, client.ResponseWrapper, error) {
return r.Reaction(action)
}

View File

@ -54,6 +54,8 @@ type Fake struct {
ReactionChain []Reactor
// WatchReactionChain is the list of watch reactors that will be attempted for every request in the order they are tried
WatchReactionChain []WatchReactor
// ProxyReactionChain is the list of proxy reactors that will be attempted for every request in the order they are tried
ProxyReactionChain []ProxyReactor
}
// Reactor is an interface to allow the composition of reaction functions.
@ -72,6 +74,14 @@ type WatchReactor interface {
React(action Action) (handled bool, ret watch.Interface, err error)
}
// ProxyReactor is an interface to allow the composition of proxy get functions.
type ProxyReactor interface {
// Handles indicates whether or not this Reactor deals with a given action
Handles(action Action) bool
// React handles a watch action and returns results. It may choose to delegate by indicated handled=false
React(action Action) (handled bool, ret client.ResponseWrapper, err error)
}
// ReactionFunc is a function that returns an object or error for a given Action. If "handled" is false,
// then the test client will continue ignore the results and continue to the next ReactionFunc
type ReactionFunc func(action Action) (handled bool, ret runtime.Object, err error)
@ -80,6 +90,10 @@ type ReactionFunc func(action Action) (handled bool, ret runtime.Object, err err
// then the test client will continue ignore the results and continue to the next ReactionFunc
type WatchReactionFunc func(action Action) (handled bool, ret watch.Interface, err error)
// ProxyReactionFunc is a function that returns a ResponseWrapper interface for a given Action. If "handled" is false,
// then the test client will continue ignore the results and continue to the next ProxyReactionFunc
type ProxyReactionFunc func(action Action) (handled bool, ret client.ResponseWrapper, err error)
// AddReactor appends a reactor to the end of the chain
func (c *Fake) AddReactor(verb, resource string, reaction ReactionFunc) {
c.ReactionChain = append(c.ReactionChain, &SimpleReactor{verb, resource, reaction})
@ -97,6 +111,11 @@ func (c *Fake) AddWatchReactor(resource string, reaction WatchReactionFunc) {
c.WatchReactionChain = append(c.WatchReactionChain, &SimpleWatchReactor{resource, reaction})
}
// AddProxyReactor appends a reactor to the end of the chain
func (c *Fake) AddProxyReactor(resource string, reaction ProxyReactionFunc) {
c.ProxyReactionChain = append(c.ProxyReactionChain, &SimpleProxyReactor{resource, reaction})
}
// Invokes records the provided Action and then invokes the ReactFn (if provided).
// defaultReturnObj is expected to be of the same type a normal call would return.
func (c *Fake) Invokes(action Action, defaultReturnObj runtime.Object) (runtime.Object, error) {
@ -142,6 +161,28 @@ func (c *Fake) InvokesWatch(action Action) (watch.Interface, error) {
return nil, fmt.Errorf("unhandled watch: %#v", action)
}
// InvokesProxy records the provided Action and then invokes the ReactFn (if provided).
func (c *Fake) InvokesProxy(action Action) client.ResponseWrapper {
c.Lock()
defer c.Unlock()
c.actions = append(c.actions, action)
for _, reactor := range c.ProxyReactionChain {
if !reactor.Handles(action) {
continue
}
handled, ret, err := reactor.React(action)
if !handled || err != nil {
continue
}
return ret
}
return nil
}
// ClearActions clears the history of actions called on the fake client
func (c *Fake) ClearActions() {
c.Lock()

View File

@ -79,8 +79,9 @@ func (a *HorizontalController) reconcileAutoscaler(hpa experimental.HorizontalPo
return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
}
currentReplicas := scale.Status.Replicas
currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource,
scale.Status.Selector)
currentConsumption, err := a.metricsClient.
ResourceConsumption(hpa.Spec.ScaleRef.Namespace).
Get(hpa.Spec.Target.Resource, scale.Status.Selector)
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
if err != nil {

View File

@ -17,111 +17,73 @@ limitations under the License.
package podautoscaler
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"io"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
_ "k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/apis/experimental"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"github.com/golang/glog"
heapster "k8s.io/heapster/api/v1/types"
"github.com/stretchr/testify/assert"
)
const (
namespace = api.NamespaceDefault
rcName = "app-rc"
podNameLabel = "app"
hpaName = "foo"
hpaListHandler = "HpaList"
scaleHandler = "Scale"
updateHpaHandler = "HpaUpdate"
eventHandler = "Event"
)
type serverResponse struct {
statusCode int
obj interface{}
func (w fakeResponseWrapper) DoRaw() ([]byte, error) {
return w.raw, nil
}
type fakeMetricsClient struct {
consumption metrics.ResourceConsumptionClient
func (w fakeResponseWrapper) Stream() (io.ReadCloser, error) {
return nil, nil
}
type fakeResourceConsumptionClient struct {
metrics map[api.ResourceName]experimental.ResourceConsumption
func newFakeResponseWrapper(raw []byte) fakeResponseWrapper {
return fakeResponseWrapper{raw: raw}
}
func (f *fakeMetricsClient) ResourceConsumption(namespace string) metrics.ResourceConsumptionClient {
return f.consumption
type fakeResponseWrapper struct {
raw []byte
}
func (f *fakeResourceConsumptionClient) Get(resource api.ResourceName, selector map[string]string) (*experimental.ResourceConsumption, error) {
consumption, found := f.metrics[resource]
if !found {
return nil, fmt.Errorf("resource not found: %v", resource)
}
return &consumption, nil
type testCase struct {
minCount int
maxCount int
initialReplicas int
desiredReplicas int
targetResource api.ResourceName
targetLevel resource.Quantity
reportedLevels []uint64
scaleUpdated bool
eventCreated bool
verifyEvents bool
}
func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) {
func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
namespace := "test-namespace"
hpaName := "test-hpa"
rcName := "test-rc"
podNamePrefix := "test-pod"
handlers := map[string]*util.FakeHandler{}
mux := http.NewServeMux()
tc.scaleUpdated = false
tc.eventCreated = false
mkHandler := func(url string, response serverResponse) *util.FakeHandler {
handler := util.FakeHandler{
StatusCode: response.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Experimental.Codec(), response.obj.(runtime.Object)),
}
mux.Handle(url, &handler)
glog.Infof("Will handle %s", url)
return &handler
}
if responses[hpaListHandler] != nil {
handlers[hpaListHandler] = mkHandler("/apis/experimental/v1/horizontalpodautoscalers", *responses[hpaListHandler])
}
if responses[scaleHandler] != nil {
handlers[scaleHandler] = mkHandler(
fmt.Sprintf("/apis/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), *responses[scaleHandler])
}
if responses[updateHpaHandler] != nil {
handlers[updateHpaHandler] = mkHandler(fmt.Sprintf("/apis/experimental/v1/namespaces/%s/horizontalpodautoscalers/%s", namespace, hpaName),
*responses[updateHpaHandler])
}
if responses[eventHandler] != nil {
handlers[eventHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/events", namespace),
*responses[eventHandler])
}
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.WriteHeader(http.StatusNotFound)
})
return httptest.NewServer(mux), handlers
}
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
hpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscalerList{
fakeClient := &testclient.Fake{}
fakeClient.AddReactor("list", "horizontalpodautoscalers", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
obj := &experimental.HorizontalPodAutoscalerList{
Items: []experimental.HorizontalPodAutoscaler{
{
ObjectMeta: api.ObjectMeta{
Name: hpaName,
Namespace: namespace,
SelfLink: "/apis/experimental/v1/namespaces/" + namespace + "/horizontalpodautoscalers/" + hpaName,
SelfLink: "experimental/v1/namespaces/" + namespace + "/horizontalpodautoscalers/" + hpaName,
},
Spec: experimental.HorizontalPodAutoscalerSpec{
ScaleRef: &experimental.SubresourceReference{
@ -130,82 +92,270 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
Namespace: namespace,
Subresource: "scale",
},
MinCount: 1,
MaxCount: 5,
Target: experimental.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.3")},
MinCount: tc.minCount,
MaxCount: tc.maxCount,
Target: experimental.ResourceConsumption{Resource: tc.targetResource, Quantity: tc.targetLevel},
},
}}}}
},
},
}
return true, obj, nil
})
scaleResponse := serverResponse{http.StatusOK, &experimental.Scale{
fakeClient.AddReactor("get", "replicationController", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
obj := &experimental.Scale{
ObjectMeta: api.ObjectMeta{
Name: rcName,
Namespace: namespace,
},
Spec: experimental.ScaleSpec{
Replicas: 1,
Replicas: tc.initialReplicas,
},
Status: experimental.ScaleStatus{
Replicas: 1,
Selector: map[string]string{"name": podNameLabel},
Replicas: tc.initialReplicas,
Selector: map[string]string{"name": podNamePrefix},
},
}}
status := experimental.HorizontalPodAutoscalerStatus{
CurrentReplicas: 1,
DesiredReplicas: 3,
}
updateHpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscaler{
ObjectMeta: api.ObjectMeta{
Name: hpaName,
Namespace: namespace,
},
Spec: experimental.HorizontalPodAutoscalerSpec{
ScaleRef: &experimental.SubresourceReference{
Kind: "replicationController",
Name: rcName,
Namespace: namespace,
Subresource: "scale",
},
MinCount: 1,
MaxCount: 5,
Target: experimental.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.3")},
},
Status: &status,
}}
eventResponse := serverResponse{http.StatusOK, &api.Event{}}
testServer, handlers := makeTestServer(t,
map[string]*serverResponse{
hpaListHandler: &hpaResponse,
scaleHandler: &scaleResponse,
updateHpaHandler: &updateHpaResponse,
eventHandler: &eventResponse,
return true, obj, nil
})
defer testServer.Close()
kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Experimental.Version()})
fakeRC := fakeResourceConsumptionClient{metrics: map[api.ResourceName]experimental.ResourceConsumption{
api.ResourceCPU: {Resource: api.ResourceCPU, Quantity: resource.MustParse("650m")},
}}
fake := fakeMetricsClient{consumption: &fakeRC}
fakeClient.AddReactor("list", "pods", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
obj := &api.PodList{}
for i := 0; i < tc.initialReplicas; i++ {
podName := fmt.Sprintf("%s-%d", podNamePrefix, i)
pod := api.Pod{
Status: api.PodStatus{
Phase: api.PodRunning,
},
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{
"name": podNamePrefix,
},
},
}
obj.Items = append(obj.Items, pod)
}
return true, obj, nil
})
hpaController := NewHorizontalController(kubeClient, &fake)
fakeClient.AddProxyReactor("services", func(action testclient.Action) (handled bool, ret client.ResponseWrapper, err error) {
timestamp := time.Now()
metrics := heapster.MetricResultList{}
for _, level := range tc.reportedLevels {
metric := heapster.MetricResult{
Metrics: []heapster.MetricPoint{{timestamp, level}},
LatestTimestamp: timestamp,
}
metrics.Items = append(metrics.Items, metric)
}
heapsterRawMemResponse, _ := json.Marshal(&metrics)
return true, newFakeResponseWrapper(heapsterRawMemResponse), nil
})
fakeClient.AddReactor("update", "replicationController", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(testclient.UpdateAction).GetObject().(*experimental.Scale)
replicas := action.(testclient.UpdateAction).GetObject().(*experimental.Scale).Spec.Replicas
assert.Equal(t, tc.desiredReplicas, replicas)
tc.scaleUpdated = true
return true, obj, nil
})
fakeClient.AddReactor("update", "horizontalpodautoscalers", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(testclient.UpdateAction).GetObject().(*experimental.HorizontalPodAutoscaler)
assert.Equal(t, namespace, obj.Namespace)
assert.Equal(t, hpaName, obj.Name)
assert.Equal(t, tc.desiredReplicas, obj.Status.DesiredReplicas)
return true, obj, nil
})
fakeClient.AddReactor("*", "events", func(action testclient.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(testclient.CreateAction).GetObject().(*api.Event)
if tc.verifyEvents {
assert.Equal(t, "SuccessfulRescale", obj.Reason)
assert.Equal(t, fmt.Sprintf("New size: %d", tc.desiredReplicas), obj.Message)
}
tc.eventCreated = true
return true, obj, nil
})
return fakeClient
}
func (tc *testCase) verifyResults(t *testing.T) {
assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.scaleUpdated)
if tc.verifyEvents {
assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.eventCreated)
}
}
func (tc *testCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
hpaController := NewHorizontalController(testClient, metrics.NewHeapsterMetricsClient(testClient))
err := hpaController.reconcileAutoscalers()
if err != nil {
t.Fatal("Failed to reconcile: %v", err)
assert.Equal(t, nil, err)
if tc.verifyEvents {
// We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration).
time.Sleep(12 * time.Second)
}
for _, h := range handlers {
h.ValidateRequestCount(t, 1)
tc.verifyResults(t)
}
obj, err := kubeClient.Codec.Decode([]byte(handlers[updateHpaHandler].RequestBody))
if err != nil {
t.Fatal("Failed to decode: %v %v", err)
}
hpa, _ := obj.(*experimental.HorizontalPodAutoscaler)
assert.Equal(t, 3, hpa.Status.DesiredReplicas)
assert.Equal(t, int64(650), hpa.Status.CurrentConsumption.Quantity.MilliValue())
assert.NotNil(t, hpa.Status.LastScaleTimestamp)
func TestCPU(t *testing.T) {
tc := testCase{
minCount: 1,
maxCount: 5,
initialReplicas: 1,
desiredReplicas: 2,
targetResource: api.ResourceCPU,
targetLevel: resource.MustParse("0.1"),
reportedLevels: []uint64{200},
}
tc.runTest(t)
}
func TestMemory(t *testing.T) {
tc := testCase{
minCount: 1,
maxCount: 5,
initialReplicas: 1,
desiredReplicas: 2,
targetResource: api.ResourceMemory,
targetLevel: resource.MustParse("1k"),
reportedLevels: []uint64{2000},
}
tc.runTest(t)
}
func TestScaleUp(t *testing.T) {
tc := testCase{
minCount: 2,
maxCount: 6,
initialReplicas: 3,
desiredReplicas: 5,
targetResource: api.ResourceMemory,
targetLevel: resource.MustParse("3k"),
reportedLevels: []uint64{3000, 5000, 7000},
}
tc.runTest(t)
}
func TestScaleDown(t *testing.T) {
tc := testCase{
minCount: 2,
maxCount: 6,
initialReplicas: 5,
desiredReplicas: 3,
targetResource: api.ResourceCPU,
targetLevel: resource.MustParse("0.5"),
reportedLevels: []uint64{100, 300, 500, 250, 250},
}
tc.runTest(t)
}
func TestTolerance(t *testing.T) {
tc := testCase{
minCount: 1,
maxCount: 5,
initialReplicas: 3,
desiredReplicas: 3,
targetResource: api.ResourceMemory,
targetLevel: resource.MustParse("1k"),
reportedLevels: []uint64{1010, 1030, 1020},
}
tc.runTest(t)
}
func TestMinCount(t *testing.T) {
tc := testCase{
minCount: 2,
maxCount: 5,
initialReplicas: 3,
desiredReplicas: 2,
targetResource: api.ResourceMemory,
targetLevel: resource.MustParse("1k"),
reportedLevels: []uint64{10, 95, 10},
}
tc.runTest(t)
}
func TestMaxCount(t *testing.T) {
tc := testCase{
minCount: 2,
maxCount: 5,
initialReplicas: 3,
desiredReplicas: 5,
targetResource: api.ResourceMemory,
targetLevel: resource.MustParse("1k"),
reportedLevels: []uint64{8000, 9500, 1000},
}
tc.runTest(t)
}
func TestSuperfluousMetrics(t *testing.T) {
tc := testCase{
minCount: 2,
maxCount: 6,
initialReplicas: 4,
desiredReplicas: 4,
targetResource: api.ResourceMemory,
targetLevel: resource.MustParse("1k"),
reportedLevels: []uint64{4000, 9500, 3000, 7000, 3200, 2000},
}
tc.runTest(t)
}
func TestMissingMetrics(t *testing.T) {
tc := testCase{
minCount: 2,
maxCount: 6,
initialReplicas: 4,
desiredReplicas: 4,
targetResource: api.ResourceMemory,
targetLevel: resource.MustParse("1k"),
reportedLevels: []uint64{400, 95},
}
tc.runTest(t)
}
func TestEmptyMetrics(t *testing.T) {
tc := testCase{
minCount: 2,
maxCount: 6,
initialReplicas: 4,
desiredReplicas: 4,
targetResource: api.ResourceMemory,
targetLevel: resource.MustParse("1k"),
reportedLevels: []uint64{},
}
tc.runTest(t)
}
func TestEventCreated(t *testing.T) {
tc := testCase{
minCount: 1,
maxCount: 5,
initialReplicas: 1,
desiredReplicas: 2,
targetResource: api.ResourceCPU,
targetLevel: resource.MustParse("0.1"),
reportedLevels: []uint64{200},
verifyEvents: true,
}
tc.runTest(t)
}
func TestEventNotCreated(t *testing.T) {
tc := testCase{
minCount: 1,
maxCount: 5,
initialReplicas: 2,
desiredReplicas: 2,
targetResource: api.ResourceCPU,
targetLevel: resource.MustParse("0.2"),
reportedLevels: []uint64{200, 200},
verifyEvents: true,
}
tc.runTest(t)
}