Merge pull request #13919 from jszczepkowski/hpa-events

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-09-14 08:48:48 -07:00
commit e583b27735
2 changed files with 104 additions and 80 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/apis/experimental"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
@ -43,15 +44,21 @@ const (
type HorizontalController struct { type HorizontalController struct {
client client.Interface client client.Interface
metricsClient metrics.MetricsClient metricsClient metrics.MetricsClient
eventRecorder record.EventRecorder
} }
var downscaleForbiddenWindow, _ = time.ParseDuration("20m") var downscaleForbiddenWindow, _ = time.ParseDuration("20m")
var upscaleForbiddenWindow, _ = time.ParseDuration("3m") var upscaleForbiddenWindow, _ = time.ParseDuration("3m")
func NewHorizontalController(client client.Interface, metricsClient metrics.MetricsClient) *HorizontalController { func NewHorizontalController(client client.Interface, metricsClient metrics.MetricsClient) *HorizontalController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(client.Events(""))
recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"})
return &HorizontalController{ return &HorizontalController{
client: client, client: client,
metricsClient: metricsClient, metricsClient: metricsClient,
eventRecorder: recorder,
} }
} }
@ -63,19 +70,13 @@ func (a *HorizontalController) Run(syncPeriod time.Duration) {
}, syncPeriod, util.NeverStop) }, syncPeriod, util.NeverStop)
} }
func (a *HorizontalController) reconcileAutoscalers() error { func (a *HorizontalController) reconcileAutoscaler(hpa experimental.HorizontalPodAutoscaler) error {
ns := api.NamespaceAll
list, err := a.client.Experimental().HorizontalPodAutoscalers(ns).List(labels.Everything(), fields.Everything())
if err != nil {
return fmt.Errorf("error listing nodes: %v", err)
}
for _, hpa := range list.Items {
reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Namespace, hpa.Spec.ScaleRef.Name) reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Namespace, hpa.Spec.ScaleRef.Name)
scale, err := a.client.Experimental().Scales(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Name) scale, err := a.client.Experimental().Scales(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Name)
if err != nil { if err != nil {
glog.Warningf("Failed to query scale subresource for %s: %v", reference, err) a.eventRecorder.Event(&hpa, "FailedGetScale", err.Error())
continue return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
} }
currentReplicas := scale.Status.Replicas currentReplicas := scale.Status.Replicas
currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource, currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource,
@ -83,8 +84,8 @@ func (a *HorizontalController) reconcileAutoscalers() error {
// TODO: what to do on partial errors (like metrics obtained for 75% of pods). // TODO: what to do on partial errors (like metrics obtained for 75% of pods).
if err != nil { if err != nil {
glog.Warningf("Error while getting metrics for %s: %v", reference, err) a.eventRecorder.Event(&hpa, "FailedGetMetrics", err.Error())
continue return fmt.Errorf("failed to get metrics for %s: %v", reference, err)
} }
usageRatio := float64(currentConsumption.Quantity.MilliValue()) / float64(hpa.Spec.Target.Quantity.MilliValue()) usageRatio := float64(currentConsumption.Quantity.MilliValue()) / float64(hpa.Spec.Target.Quantity.MilliValue())
@ -127,9 +128,10 @@ func (a *HorizontalController) reconcileAutoscalers() error {
scale.Spec.Replicas = desiredReplicas scale.Spec.Replicas = desiredReplicas
_, err = a.client.Experimental().Scales(hpa.Namespace).Update(hpa.Spec.ScaleRef.Kind, scale) _, err = a.client.Experimental().Scales(hpa.Namespace).Update(hpa.Spec.ScaleRef.Kind, scale)
if err != nil { if err != nil {
glog.Warningf("Failed to rescale %s: %v", reference, err) a.eventRecorder.Eventf(&hpa, "FailedRescale", "New size: %d; error: %v", desiredReplicas, err.Error())
continue return fmt.Errorf("failed to rescale %s: %v", reference, err)
} }
a.eventRecorder.Eventf(&hpa, "SuccessfulRescale", "New size: %d", desiredReplicas)
} else { } else {
desiredReplicas = currentReplicas desiredReplicas = currentReplicas
} }
@ -147,8 +149,22 @@ func (a *HorizontalController) reconcileAutoscalers() error {
_, err = a.client.Experimental().HorizontalPodAutoscalers(hpa.Namespace).Update(&hpa) _, err = a.client.Experimental().HorizontalPodAutoscalers(hpa.Namespace).Update(&hpa)
if err != nil { if err != nil {
glog.Warningf("Failed to update HorizontalPodAutoscaler %s: %v", hpa.Name, err) a.eventRecorder.Event(&hpa, "FailedUpdateStatus", err.Error())
continue return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)
}
return nil
}
func (a *HorizontalController) reconcileAutoscalers() error {
ns := api.NamespaceAll
list, err := a.client.Experimental().HorizontalPodAutoscalers(ns).List(labels.Everything(), fields.Everything())
if err != nil {
return fmt.Errorf("error listing nodes: %v", err)
}
for _, hpa := range list.Items {
err := a.reconcileAutoscaler(hpa)
if err != nil {
glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err)
} }
} }
return nil return nil

View File

@ -45,6 +45,7 @@ const (
hpaListHandler = "HpaList" hpaListHandler = "HpaList"
scaleHandler = "Scale" scaleHandler = "Scale"
updateHpaHandler = "HpaUpdate" updateHpaHandler = "HpaUpdate"
eventHandler = "Event"
) )
type serverResponse struct { type serverResponse struct {
@ -101,6 +102,11 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte
*responses[updateHpaHandler]) *responses[updateHpaHandler])
} }
if responses[eventHandler] != nil {
handlers[eventHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/events", namespace),
*responses[eventHandler])
}
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI) t.Errorf("unexpected request: %v", req.RequestURI)
res.WriteHeader(http.StatusNotFound) res.WriteHeader(http.StatusNotFound)
@ -109,13 +115,13 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte
} }
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
hpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscalerList{ hpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscalerList{
Items: []experimental.HorizontalPodAutoscaler{ Items: []experimental.HorizontalPodAutoscaler{
{ {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: hpaName, Name: hpaName,
Namespace: namespace, Namespace: namespace,
SelfLink: "experimental/v1/namespaces/" + namespace + "/horizontalpodautoscalers/" + hpaName,
}, },
Spec: experimental.HorizontalPodAutoscalerSpec{ Spec: experimental.HorizontalPodAutoscalerSpec{
ScaleRef: &experimental.SubresourceReference{ ScaleRef: &experimental.SubresourceReference{
@ -149,7 +155,6 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
DesiredReplicas: 3, DesiredReplicas: 3,
} }
updateHpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscaler{ updateHpaResponse := serverResponse{http.StatusOK, &experimental.HorizontalPodAutoscaler{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: hpaName, Name: hpaName,
Namespace: namespace, Namespace: namespace,
@ -168,11 +173,14 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
Status: &status, Status: &status,
}} }}
eventResponse := serverResponse{http.StatusOK, &api.Event{}}
testServer, handlers := makeTestServer(t, testServer, handlers := makeTestServer(t,
map[string]*serverResponse{ map[string]*serverResponse{
hpaListHandler: &hpaResponse, hpaListHandler: &hpaResponse,
scaleHandler: &scaleResponse, scaleHandler: &scaleResponse,
updateHpaHandler: &updateHpaResponse, updateHpaHandler: &updateHpaResponse,
eventHandler: &eventResponse,
}) })
defer testServer.Close() defer testServer.Close()