From e8ceee0d6f413084eefbc7a169770a9a00ba74f4 Mon Sep 17 00:00:00 2001 From: Jerzy Szczepkowski Date: Mon, 14 Sep 2015 15:08:43 +0200 Subject: [PATCH] Events for HorizontalPodAutoscalers. Implemented events for reconcilation of HorizontalPodAutoscalers. --- pkg/controller/podautoscaler/horizontal.go | 172 ++++++++++-------- .../podautoscaler/horizontal_test.go | 12 +- 2 files changed, 104 insertions(+), 80 deletions(-) diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 938616c8116..953642b67a2 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/experimental" + "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "k8s.io/kubernetes/pkg/fields" @@ -43,15 +44,21 @@ const ( type HorizontalController struct { client client.Interface metricsClient metrics.MetricsClient + eventRecorder record.EventRecorder } var downscaleForbiddenWindow, _ = time.ParseDuration("20m") var upscaleForbiddenWindow, _ = time.ParseDuration("3m") func NewHorizontalController(client client.Interface, metricsClient metrics.MetricsClient) *HorizontalController { + broadcaster := record.NewBroadcaster() + broadcaster.StartRecordingToSink(client.Events("")) + recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) + return &HorizontalController{ client: client, metricsClient: metricsClient, + eventRecorder: recorder, } } @@ -63,6 +70,91 @@ func (a *HorizontalController) Run(syncPeriod time.Duration) { }, syncPeriod, util.NeverStop) } +func (a *HorizontalController) reconcileAutoscaler(hpa experimental.HorizontalPodAutoscaler) error { + reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Namespace, hpa.Spec.ScaleRef.Name) + + scale, err := a.client.Experimental().Scales(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Name) + if err != nil { + a.eventRecorder.Event(&hpa, "FailedGetScale", err.Error()) + return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err) + } + currentReplicas := scale.Status.Replicas + currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource, + scale.Status.Selector) + + // TODO: what to do on partial errors (like metrics obtained for 75% of pods). + if err != nil { + a.eventRecorder.Event(&hpa, "FailedGetMetrics", err.Error()) + return fmt.Errorf("failed to get metrics for %s: %v", reference, err) + } + + usageRatio := float64(currentConsumption.Quantity.MilliValue()) / float64(hpa.Spec.Target.Quantity.MilliValue()) + desiredReplicas := int(math.Ceil(usageRatio * float64(currentReplicas))) + + if desiredReplicas < hpa.Spec.MinCount { + desiredReplicas = hpa.Spec.MinCount + } + + // TODO: remove when pod ideling is done. + if desiredReplicas == 0 { + desiredReplicas = 1 + } + + if desiredReplicas > hpa.Spec.MaxCount { + desiredReplicas = hpa.Spec.MaxCount + } + now := time.Now() + rescale := false + + if desiredReplicas != currentReplicas { + // Going down only if the usageRatio dropped significantly below the target + // and there was no rescaling in the last downscaleForbiddenWindow. + if desiredReplicas < currentReplicas && usageRatio < (1-tolerance) && + (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil || + hpa.Status.LastScaleTimestamp.Add(downscaleForbiddenWindow).Before(now)) { + rescale = true + } + + // Going up only if the usage ratio increased significantly above the target + // and there was no rescaling in the last upscaleForbiddenWindow. + if desiredReplicas > currentReplicas && usageRatio > (1+tolerance) && + (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil || + hpa.Status.LastScaleTimestamp.Add(upscaleForbiddenWindow).Before(now)) { + rescale = true + } + } + + if rescale { + scale.Spec.Replicas = desiredReplicas + _, err = a.client.Experimental().Scales(hpa.Namespace).Update(hpa.Spec.ScaleRef.Kind, scale) + if err != nil { + a.eventRecorder.Eventf(&hpa, "FailedRescale", "New size: %d; error: %v", desiredReplicas, err.Error()) + return fmt.Errorf("failed to rescale %s: %v", reference, err) + } + a.eventRecorder.Eventf(&hpa, "SuccessfulRescale", "New size: %d", desiredReplicas) + } else { + desiredReplicas = currentReplicas + } + + status := experimental.HorizontalPodAutoscalerStatus{ + CurrentReplicas: currentReplicas, + DesiredReplicas: desiredReplicas, + CurrentConsumption: currentConsumption, + } + hpa.Status = &status + if rescale { + now := util.NewTime(now) + hpa.Status.LastScaleTimestamp = &now + } + + _, err = a.client.Experimental().HorizontalPodAutoscalers(hpa.Namespace).Update(&hpa) + if err != nil { + a.eventRecorder.Event(&hpa, "FailedUpdateStatus", err.Error()) + return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err) + } + return nil +} + func (a *HorizontalController) reconcileAutoscalers() error { ns := api.NamespaceAll list, err := a.client.Experimental().HorizontalPodAutoscalers(ns).List(labels.Everything(), fields.Everything()) @@ -70,85 +162,9 @@ func (a *HorizontalController) reconcileAutoscalers() error { return fmt.Errorf("error listing nodes: %v", err) } for _, hpa := range list.Items { - reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Namespace, hpa.Spec.ScaleRef.Name) - - scale, err := a.client.Experimental().Scales(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Name) + err := a.reconcileAutoscaler(hpa) if err != nil { - glog.Warningf("Failed to query scale subresource for %s: %v", reference, err) - continue - } - currentReplicas := scale.Status.Replicas - currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource, - scale.Status.Selector) - - // TODO: what to do on partial errors (like metrics obtained for 75% of pods). - if err != nil { - glog.Warningf("Error while getting metrics for %s: %v", reference, err) - continue - } - - usageRatio := float64(currentConsumption.Quantity.MilliValue()) / float64(hpa.Spec.Target.Quantity.MilliValue()) - desiredReplicas := int(math.Ceil(usageRatio * float64(currentReplicas))) - - if desiredReplicas < hpa.Spec.MinCount { - desiredReplicas = hpa.Spec.MinCount - } - - // TODO: remove when pod ideling is done. - if desiredReplicas == 0 { - desiredReplicas = 1 - } - - if desiredReplicas > hpa.Spec.MaxCount { - desiredReplicas = hpa.Spec.MaxCount - } - now := time.Now() - rescale := false - - if desiredReplicas != currentReplicas { - // Going down only if the usageRatio dropped significantly below the target - // and there was no rescaling in the last downscaleForbiddenWindow. - if desiredReplicas < currentReplicas && usageRatio < (1-tolerance) && - (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil || - hpa.Status.LastScaleTimestamp.Add(downscaleForbiddenWindow).Before(now)) { - rescale = true - } - - // Going up only if the usage ratio increased significantly above the target - // and there was no rescaling in the last upscaleForbiddenWindow. - if desiredReplicas > currentReplicas && usageRatio > (1+tolerance) && - (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil || - hpa.Status.LastScaleTimestamp.Add(upscaleForbiddenWindow).Before(now)) { - rescale = true - } - } - - if rescale { - scale.Spec.Replicas = desiredReplicas - _, err = a.client.Experimental().Scales(hpa.Namespace).Update(hpa.Spec.ScaleRef.Kind, scale) - if err != nil { - glog.Warningf("Failed to rescale %s: %v", reference, err) - continue - } - } else { - desiredReplicas = currentReplicas - } - - status := experimental.HorizontalPodAutoscalerStatus{ - CurrentReplicas: currentReplicas, - DesiredReplicas: desiredReplicas, - CurrentConsumption: currentConsumption, - } - hpa.Status = &status - if rescale { - now := util.NewTime(now) - hpa.Status.LastScaleTimestamp = &now - } - - _, err = a.client.Experimental().HorizontalPodAutoscalers(hpa.Namespace).Update(&hpa) - if err != nil { - glog.Warningf("Failed to update HorizontalPodAutoscaler %s: %v", hpa.Name, err) - continue + glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err) } } return nil diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index 1bb03ae810e..42aacd98a10 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -45,6 +45,7 @@ const ( hpaListHandler = "HpaList" scaleHandler = "Scale" updateHpaHandler = "HpaUpdate" + eventHandler = "Event" ) type serverResponse struct { @@ -101,6 +102,11 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte *responses[updateHpaHandler]) } + if responses[eventHandler] != nil { + handlers[eventHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/events", namespace), + *responses[eventHandler]) + } + mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { t.Errorf("unexpected request: %v", req.RequestURI) res.WriteHeader(http.StatusNotFound) @@ -109,13 +115,13 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte } func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { - hpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscalerList{ Items: []experimental.HorizontalPodAutoscaler{ { ObjectMeta: api.ObjectMeta{ Name: hpaName, Namespace: namespace, + SelfLink: "experimental/v1/namespaces/" + namespace + "/horizontalpodautoscalers/" + hpaName, }, Spec: experimental.HorizontalPodAutoscalerSpec{ ScaleRef: &experimental.SubresourceReference{ @@ -149,7 +155,6 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { DesiredReplicas: 3, } updateHpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscaler{ - ObjectMeta: api.ObjectMeta{ Name: hpaName, Namespace: namespace, @@ -168,11 +173,14 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Status: &status, }} + eventResponse := serverResponse{http.StatusOK, &api.Event{}} + testServer, handlers := makeTestServer(t, map[string]*serverResponse{ hpaListHandler: &hpaResponse, scaleHandler: &scaleResponse, updateHpaHandler: &updateHpaResponse, + eventHandler: &eventResponse, }) defer testServer.Close()