diff --git a/pkg/client/unversioned/testclient/fake_services.go b/pkg/client/unversioned/testclient/fake_services.go index a24b2db5bb7..49191fbd45f 100644 --- a/pkg/client/unversioned/testclient/fake_services.go +++ b/pkg/client/unversioned/testclient/fake_services.go @@ -77,6 +77,5 @@ func (c *FakeServices) Watch(label labels.Selector, field fields.Selector, resou } func (c *FakeServices) ProxyGet(name, path string, params map[string]string) unversioned.ResponseWrapper { - c.Fake.Invokes(NewProxyGetAction("services", c.Namespace, name, path, params), nil) - return nil + return c.Fake.InvokesProxy(NewProxyGetAction("services", c.Namespace, name, path, params)) } diff --git a/pkg/client/unversioned/testclient/fixture.go b/pkg/client/unversioned/testclient/fixture.go index ce0f750e757..bcecd9b09a1 100644 --- a/pkg/client/unversioned/testclient/fixture.go +++ b/pkg/client/unversioned/testclient/fixture.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" + client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/yaml" "k8s.io/kubernetes/pkg/watch" @@ -260,7 +261,7 @@ func (r *SimpleReactor) React(action Action) (bool, runtime.Object, error) { return r.Reaction(action) } -// SimpleWatchReactor is a Reactor. Each reaction function is attached to a given verb,resource tuple. "*" in either field matches everything for that value. +// SimpleWatchReactor is a WatchReactor. Each reaction function is attached to a given resource. "*" matches everything for that value. // For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions type SimpleWatchReactor struct { Resource string @@ -280,3 +281,24 @@ func (r *SimpleWatchReactor) Handles(action Action) bool { func (r *SimpleWatchReactor) React(action Action) (bool, watch.Interface, error) { return r.Reaction(action) } + +// SimpleProxyReactor is a ProxyReactor. Each reaction function is attached to a given resource. "*" matches everything for that value. +// For instance, *,pods matches all verbs on pods. This allows for easier composition of reaction functions. +type SimpleProxyReactor struct { + Resource string + + Reaction ProxyReactionFunc +} + +func (r *SimpleProxyReactor) Handles(action Action) bool { + resourceCovers := r.Resource == "*" || r.Resource == action.GetResource() + if !resourceCovers { + return false + } + + return true +} + +func (r *SimpleProxyReactor) React(action Action) (bool, client.ResponseWrapper, error) { + return r.Reaction(action) +} diff --git a/pkg/client/unversioned/testclient/testclient.go b/pkg/client/unversioned/testclient/testclient.go index 6dde3db6703..21dc5aba23e 100644 --- a/pkg/client/unversioned/testclient/testclient.go +++ b/pkg/client/unversioned/testclient/testclient.go @@ -54,6 +54,8 @@ type Fake struct { ReactionChain []Reactor // WatchReactionChain is the list of watch reactors that will be attempted for every request in the order they are tried WatchReactionChain []WatchReactor + // ProxyReactionChain is the list of proxy reactors that will be attempted for every request in the order they are tried + ProxyReactionChain []ProxyReactor } // Reactor is an interface to allow the composition of reaction functions. @@ -72,6 +74,14 @@ type WatchReactor interface { React(action Action) (handled bool, ret watch.Interface, err error) } +// ProxyReactor is an interface to allow the composition of proxy get functions. +type ProxyReactor interface { + // Handles indicates whether or not this Reactor deals with a given action + Handles(action Action) bool + // React handles a watch action and returns results. It may choose to delegate by indicated handled=false + React(action Action) (handled bool, ret client.ResponseWrapper, err error) +} + // ReactionFunc is a function that returns an object or error for a given Action. If "handled" is false, // then the test client will continue ignore the results and continue to the next ReactionFunc type ReactionFunc func(action Action) (handled bool, ret runtime.Object, err error) @@ -80,6 +90,10 @@ type ReactionFunc func(action Action) (handled bool, ret runtime.Object, err err // then the test client will continue ignore the results and continue to the next ReactionFunc type WatchReactionFunc func(action Action) (handled bool, ret watch.Interface, err error) +// ProxyReactionFunc is a function that returns a ResponseWrapper interface for a given Action. If "handled" is false, +// then the test client will continue ignore the results and continue to the next ProxyReactionFunc +type ProxyReactionFunc func(action Action) (handled bool, ret client.ResponseWrapper, err error) + // AddReactor appends a reactor to the end of the chain func (c *Fake) AddReactor(verb, resource string, reaction ReactionFunc) { c.ReactionChain = append(c.ReactionChain, &SimpleReactor{verb, resource, reaction}) @@ -97,6 +111,11 @@ func (c *Fake) AddWatchReactor(resource string, reaction WatchReactionFunc) { c.WatchReactionChain = append(c.WatchReactionChain, &SimpleWatchReactor{resource, reaction}) } +// AddProxyReactor appends a reactor to the end of the chain +func (c *Fake) AddProxyReactor(resource string, reaction ProxyReactionFunc) { + c.ProxyReactionChain = append(c.ProxyReactionChain, &SimpleProxyReactor{resource, reaction}) +} + // Invokes records the provided Action and then invokes the ReactFn (if provided). // defaultReturnObj is expected to be of the same type a normal call would return. func (c *Fake) Invokes(action Action, defaultReturnObj runtime.Object) (runtime.Object, error) { @@ -142,6 +161,28 @@ func (c *Fake) InvokesWatch(action Action) (watch.Interface, error) { return nil, fmt.Errorf("unhandled watch: %#v", action) } +// InvokesProxy records the provided Action and then invokes the ReactFn (if provided). +func (c *Fake) InvokesProxy(action Action) client.ResponseWrapper { + c.Lock() + defer c.Unlock() + + c.actions = append(c.actions, action) + for _, reactor := range c.ProxyReactionChain { + if !reactor.Handles(action) { + continue + } + + handled, ret, err := reactor.React(action) + if !handled || err != nil { + continue + } + + return ret + } + + return nil +} + // ClearActions clears the history of actions called on the fake client func (c *Fake) ClearActions() { c.Lock() diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 953642b67a2..87168128a95 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -79,8 +79,9 @@ func (a *HorizontalController) reconcileAutoscaler(hpa experimental.HorizontalPo return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err) } currentReplicas := scale.Status.Replicas - currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource, - scale.Status.Selector) + currentConsumption, err := a.metricsClient. + ResourceConsumption(hpa.Spec.ScaleRef.Namespace). + Get(hpa.Spec.Target.Resource, scale.Status.Selector) // TODO: what to do on partial errors (like metrics obtained for 75% of pods). if err != nil { diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index 4b7bb9d4690..965bebb1376 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -17,195 +17,345 @@ limitations under the License. package podautoscaler import ( + "encoding/json" "fmt" - "net/http" - "net/http/httptest" + "io" "testing" + "time" "k8s.io/kubernetes/pkg/api" _ "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/apis/experimental" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" - "github.com/golang/glog" + heapster "k8s.io/heapster/api/v1/types" + "github.com/stretchr/testify/assert" ) -const ( - namespace = api.NamespaceDefault - rcName = "app-rc" - podNameLabel = "app" - hpaName = "foo" - - hpaListHandler = "HpaList" - scaleHandler = "Scale" - updateHpaHandler = "HpaUpdate" - eventHandler = "Event" -) - -type serverResponse struct { - statusCode int - obj interface{} +func (w fakeResponseWrapper) DoRaw() ([]byte, error) { + return w.raw, nil } -type fakeMetricsClient struct { - consumption metrics.ResourceConsumptionClient +func (w fakeResponseWrapper) Stream() (io.ReadCloser, error) { + return nil, nil } -type fakeResourceConsumptionClient struct { - metrics map[api.ResourceName]experimental.ResourceConsumption +func newFakeResponseWrapper(raw []byte) fakeResponseWrapper { + return fakeResponseWrapper{raw: raw} } -func (f *fakeMetricsClient) ResourceConsumption(namespace string) metrics.ResourceConsumptionClient { - return f.consumption +type fakeResponseWrapper struct { + raw []byte } -func (f *fakeResourceConsumptionClient) Get(resource api.ResourceName, selector map[string]string) (*experimental.ResourceConsumption, error) { - consumption, found := f.metrics[resource] - if !found { - return nil, fmt.Errorf("resource not found: %v", resource) - } - return &consumption, nil +type testCase struct { + minCount int + maxCount int + initialReplicas int + desiredReplicas int + targetResource api.ResourceName + targetLevel resource.Quantity + reportedLevels []uint64 + scaleUpdated bool + eventCreated bool + verifyEvents bool } -func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) { +func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake { + namespace := "test-namespace" + hpaName := "test-hpa" + rcName := "test-rc" + podNamePrefix := "test-pod" - handlers := map[string]*util.FakeHandler{} - mux := http.NewServeMux() + tc.scaleUpdated = false + tc.eventCreated = false - mkHandler := func(url string, response serverResponse) *util.FakeHandler { - handler := util.FakeHandler{ - StatusCode: response.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Experimental.Codec(), response.obj.(runtime.Object)), - } - mux.Handle(url, &handler) - glog.Infof("Will handle %s", url) - return &handler - } - - if responses[hpaListHandler] != nil { - handlers[hpaListHandler] = mkHandler("/apis/experimental/v1/horizontalpodautoscalers", *responses[hpaListHandler]) - } - - if responses[scaleHandler] != nil { - handlers[scaleHandler] = mkHandler( - fmt.Sprintf("/apis/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), *responses[scaleHandler]) - } - - if responses[updateHpaHandler] != nil { - handlers[updateHpaHandler] = mkHandler(fmt.Sprintf("/apis/experimental/v1/namespaces/%s/horizontalpodautoscalers/%s", namespace, hpaName), - *responses[updateHpaHandler]) - } - - if responses[eventHandler] != nil { - handlers[eventHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/events", namespace), - *responses[eventHandler]) - } - - mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { - t.Errorf("unexpected request: %v", req.RequestURI) - res.WriteHeader(http.StatusNotFound) - }) - return httptest.NewServer(mux), handlers -} - -func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { - hpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscalerList{ - Items: []experimental.HorizontalPodAutoscaler{ - { - ObjectMeta: api.ObjectMeta{ - Name: hpaName, - Namespace: namespace, - SelfLink: "/apis/experimental/v1/namespaces/" + namespace + "/horizontalpodautoscalers/" + hpaName, - }, - Spec: experimental.HorizontalPodAutoscalerSpec{ - ScaleRef: &experimental.SubresourceReference{ - Kind: "replicationController", - Name: rcName, - Namespace: namespace, - Subresource: "scale", + fakeClient := &testclient.Fake{} + fakeClient.AddReactor("list", "horizontalpodautoscalers", func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + obj := &experimental.HorizontalPodAutoscalerList{ + Items: []experimental.HorizontalPodAutoscaler{ + { + ObjectMeta: api.ObjectMeta{ + Name: hpaName, + Namespace: namespace, + SelfLink: "experimental/v1/namespaces/" + namespace + "/horizontalpodautoscalers/" + hpaName, + }, + Spec: experimental.HorizontalPodAutoscalerSpec{ + ScaleRef: &experimental.SubresourceReference{ + Kind: "replicationController", + Name: rcName, + Namespace: namespace, + Subresource: "scale", + }, + MinCount: tc.minCount, + MaxCount: tc.maxCount, + Target: experimental.ResourceConsumption{Resource: tc.targetResource, Quantity: tc.targetLevel}, }, - MinCount: 1, - MaxCount: 5, - Target: experimental.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.3")}, }, - }}}} - - scaleResponse := serverResponse{http.StatusOK, &experimental.Scale{ - ObjectMeta: api.ObjectMeta{ - Name: rcName, - Namespace: namespace, - }, - Spec: experimental.ScaleSpec{ - Replicas: 1, - }, - Status: experimental.ScaleStatus{ - Replicas: 1, - Selector: map[string]string{"name": podNameLabel}, - }, - }} - - status := experimental.HorizontalPodAutoscalerStatus{ - CurrentReplicas: 1, - DesiredReplicas: 3, - } - updateHpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscaler{ - ObjectMeta: api.ObjectMeta{ - Name: hpaName, - Namespace: namespace, - }, - Spec: experimental.HorizontalPodAutoscalerSpec{ - ScaleRef: &experimental.SubresourceReference{ - Kind: "replicationController", - Name: rcName, - Namespace: namespace, - Subresource: "scale", }, - MinCount: 1, - MaxCount: 5, - Target: experimental.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.3")}, - }, - Status: &status, - }} + } + return true, obj, nil + }) - eventResponse := serverResponse{http.StatusOK, &api.Event{}} + fakeClient.AddReactor("get", "replicationController", func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + obj := &experimental.Scale{ + ObjectMeta: api.ObjectMeta{ + Name: rcName, + Namespace: namespace, + }, + Spec: experimental.ScaleSpec{ + Replicas: tc.initialReplicas, + }, + Status: experimental.ScaleStatus{ + Replicas: tc.initialReplicas, + Selector: map[string]string{"name": podNamePrefix}, + }, + } + return true, obj, nil + }) - testServer, handlers := makeTestServer(t, - map[string]*serverResponse{ - hpaListHandler: &hpaResponse, - scaleHandler: &scaleResponse, - updateHpaHandler: &updateHpaResponse, - eventHandler: &eventResponse, - }) + fakeClient.AddReactor("list", "pods", func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + obj := &api.PodList{} + for i := 0; i < tc.initialReplicas; i++ { + podName := fmt.Sprintf("%s-%d", podNamePrefix, i) + pod := api.Pod{ + Status: api.PodStatus{ + Phase: api.PodRunning, + }, + ObjectMeta: api.ObjectMeta{ + Name: podName, + Namespace: namespace, + Labels: map[string]string{ + "name": podNamePrefix, + }, + }, + } + obj.Items = append(obj.Items, pod) + } + return true, obj, nil + }) - defer testServer.Close() - kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Experimental.Version()}) - fakeRC := fakeResourceConsumptionClient{metrics: map[api.ResourceName]experimental.ResourceConsumption{ - api.ResourceCPU: {Resource: api.ResourceCPU, Quantity: resource.MustParse("650m")}, - }} - fake := fakeMetricsClient{consumption: &fakeRC} + fakeClient.AddProxyReactor("services", func(action testclient.Action) (handled bool, ret client.ResponseWrapper, err error) { + timestamp := time.Now() + metrics := heapster.MetricResultList{} + for _, level := range tc.reportedLevels { + metric := heapster.MetricResult{ + Metrics: []heapster.MetricPoint{{timestamp, level}}, + LatestTimestamp: timestamp, + } + metrics.Items = append(metrics.Items, metric) + } + heapsterRawMemResponse, _ := json.Marshal(&metrics) + return true, newFakeResponseWrapper(heapsterRawMemResponse), nil + }) - hpaController := NewHorizontalController(kubeClient, &fake) + fakeClient.AddReactor("update", "replicationController", func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(testclient.UpdateAction).GetObject().(*experimental.Scale) + replicas := action.(testclient.UpdateAction).GetObject().(*experimental.Scale).Spec.Replicas + assert.Equal(t, tc.desiredReplicas, replicas) + tc.scaleUpdated = true + return true, obj, nil + }) - err := hpaController.reconcileAutoscalers() - if err != nil { - t.Fatal("Failed to reconcile: %v", err) - } - for _, h := range handlers { - h.ValidateRequestCount(t, 1) - } - obj, err := kubeClient.Codec.Decode([]byte(handlers[updateHpaHandler].RequestBody)) - if err != nil { - t.Fatal("Failed to decode: %v %v", err) - } - hpa, _ := obj.(*experimental.HorizontalPodAutoscaler) + fakeClient.AddReactor("update", "horizontalpodautoscalers", func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(testclient.UpdateAction).GetObject().(*experimental.HorizontalPodAutoscaler) + assert.Equal(t, namespace, obj.Namespace) + assert.Equal(t, hpaName, obj.Name) + assert.Equal(t, tc.desiredReplicas, obj.Status.DesiredReplicas) + return true, obj, nil + }) - assert.Equal(t, 3, hpa.Status.DesiredReplicas) - assert.Equal(t, int64(650), hpa.Status.CurrentConsumption.Quantity.MilliValue()) - assert.NotNil(t, hpa.Status.LastScaleTimestamp) + fakeClient.AddReactor("*", "events", func(action testclient.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(testclient.CreateAction).GetObject().(*api.Event) + if tc.verifyEvents { + assert.Equal(t, "SuccessfulRescale", obj.Reason) + assert.Equal(t, fmt.Sprintf("New size: %d", tc.desiredReplicas), obj.Message) + } + tc.eventCreated = true + return true, obj, nil + }) + + return fakeClient +} + +func (tc *testCase) verifyResults(t *testing.T) { + assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.scaleUpdated) + if tc.verifyEvents { + assert.Equal(t, tc.initialReplicas != tc.desiredReplicas, tc.eventCreated) + } +} + +func (tc *testCase) runTest(t *testing.T) { + testClient := tc.prepareTestClient(t) + hpaController := NewHorizontalController(testClient, metrics.NewHeapsterMetricsClient(testClient)) + err := hpaController.reconcileAutoscalers() + assert.Equal(t, nil, err) + if tc.verifyEvents { + // We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration). + time.Sleep(12 * time.Second) + } + tc.verifyResults(t) +} + +func TestCPU(t *testing.T) { + tc := testCase{ + minCount: 1, + maxCount: 5, + initialReplicas: 1, + desiredReplicas: 2, + targetResource: api.ResourceCPU, + targetLevel: resource.MustParse("0.1"), + reportedLevels: []uint64{200}, + } + tc.runTest(t) +} + +func TestMemory(t *testing.T) { + tc := testCase{ + minCount: 1, + maxCount: 5, + initialReplicas: 1, + desiredReplicas: 2, + targetResource: api.ResourceMemory, + targetLevel: resource.MustParse("1k"), + reportedLevels: []uint64{2000}, + } + tc.runTest(t) +} + +func TestScaleUp(t *testing.T) { + tc := testCase{ + minCount: 2, + maxCount: 6, + initialReplicas: 3, + desiredReplicas: 5, + targetResource: api.ResourceMemory, + targetLevel: resource.MustParse("3k"), + reportedLevels: []uint64{3000, 5000, 7000}, + } + tc.runTest(t) +} + +func TestScaleDown(t *testing.T) { + tc := testCase{ + minCount: 2, + maxCount: 6, + initialReplicas: 5, + desiredReplicas: 3, + targetResource: api.ResourceCPU, + targetLevel: resource.MustParse("0.5"), + reportedLevels: []uint64{100, 300, 500, 250, 250}, + } + tc.runTest(t) +} + +func TestTolerance(t *testing.T) { + tc := testCase{ + minCount: 1, + maxCount: 5, + initialReplicas: 3, + desiredReplicas: 3, + targetResource: api.ResourceMemory, + targetLevel: resource.MustParse("1k"), + reportedLevels: []uint64{1010, 1030, 1020}, + } + tc.runTest(t) +} + +func TestMinCount(t *testing.T) { + tc := testCase{ + minCount: 2, + maxCount: 5, + initialReplicas: 3, + desiredReplicas: 2, + targetResource: api.ResourceMemory, + targetLevel: resource.MustParse("1k"), + reportedLevels: []uint64{10, 95, 10}, + } + tc.runTest(t) +} + +func TestMaxCount(t *testing.T) { + tc := testCase{ + minCount: 2, + maxCount: 5, + initialReplicas: 3, + desiredReplicas: 5, + targetResource: api.ResourceMemory, + targetLevel: resource.MustParse("1k"), + reportedLevels: []uint64{8000, 9500, 1000}, + } + tc.runTest(t) +} + +func TestSuperfluousMetrics(t *testing.T) { + tc := testCase{ + minCount: 2, + maxCount: 6, + initialReplicas: 4, + desiredReplicas: 4, + targetResource: api.ResourceMemory, + targetLevel: resource.MustParse("1k"), + reportedLevels: []uint64{4000, 9500, 3000, 7000, 3200, 2000}, + } + tc.runTest(t) +} + +func TestMissingMetrics(t *testing.T) { + tc := testCase{ + minCount: 2, + maxCount: 6, + initialReplicas: 4, + desiredReplicas: 4, + targetResource: api.ResourceMemory, + targetLevel: resource.MustParse("1k"), + reportedLevels: []uint64{400, 95}, + } + tc.runTest(t) +} + +func TestEmptyMetrics(t *testing.T) { + tc := testCase{ + minCount: 2, + maxCount: 6, + initialReplicas: 4, + desiredReplicas: 4, + targetResource: api.ResourceMemory, + targetLevel: resource.MustParse("1k"), + reportedLevels: []uint64{}, + } + tc.runTest(t) +} + +func TestEventCreated(t *testing.T) { + tc := testCase{ + minCount: 1, + maxCount: 5, + initialReplicas: 1, + desiredReplicas: 2, + targetResource: api.ResourceCPU, + targetLevel: resource.MustParse("0.1"), + reportedLevels: []uint64{200}, + verifyEvents: true, + } + tc.runTest(t) +} + +func TestEventNotCreated(t *testing.T) { + tc := testCase{ + minCount: 1, + maxCount: 5, + initialReplicas: 2, + desiredReplicas: 2, + targetResource: api.ResourceCPU, + targetLevel: resource.MustParse("0.2"), + reportedLevels: []uint64{200, 200}, + verifyEvents: true, + } + tc.runTest(t) }