Switch hpa controller to shared informer

This commit is contained in:
Andy Goldstein 2017-02-09 14:59:19 -05:00
parent 74186d3e06
commit d11aa98c29
6 changed files with 74 additions and 58 deletions

View File

@ -44,6 +44,7 @@ func startHPAController(ctx ControllerContext) (bool, error) {
hpaClient.Extensions(),
hpaClient.Autoscaling(),
replicaCalc,
ctx.NewInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration,
).Run(ctx.Stop)
return true, nil

View File

@ -24,15 +24,15 @@ go_library(
"//pkg/client/clientset_generated/clientset/typed/autoscaling/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/autoscaling/v1:go_default_library",
"//pkg/client/listers/autoscaling/v1:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache",
@ -49,7 +49,6 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/autoscaling/install:go_default_library",
@ -57,6 +56,8 @@ go_test(
"//pkg/apis/extensions/install:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:github.com/stretchr/testify/require",
@ -65,11 +66,9 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/fake",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/heapster/metrics/api/v1/types",
"//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1",
],

View File

@ -25,9 +25,7 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
@ -38,6 +36,8 @@ import (
extensionsv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
unversionedautoscaling "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/autoscaling/v1"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1"
autoscalinginformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/autoscaling/v1"
autoscalinglisters "k8s.io/kubernetes/pkg/client/listers/autoscaling/v1"
)
const (
@ -65,27 +65,36 @@ type HorizontalController struct {
replicaCalc *ReplicaCalculator
eventRecorder record.EventRecorder
// A store of HPA objects, populated by the controller.
store cache.Store
// Watches changes to all HPA objects.
controller cache.Controller
// hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
// NewHorizontalController.
hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
hpaListerSynced cache.InformerSynced
}
var downscaleForbiddenWindow = 5 * time.Minute
var upscaleForbiddenWindow = 3 * time.Minute
func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, cache.Controller) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return controller.hpaNamespacer.HorizontalPodAutoscalers(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return controller.hpaNamespacer.HorizontalPodAutoscalers(metav1.NamespaceAll).Watch(options)
},
},
&autoscaling.HorizontalPodAutoscaler{},
resyncPeriod,
func NewHorizontalController(
evtNamespacer v1core.EventsGetter,
scaleNamespacer unversionedextensions.ScalesGetter,
hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter,
replicaCalc *ReplicaCalculator,
hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
resyncPeriod time.Duration,
) *HorizontalController {
broadcaster := record.NewBroadcaster()
// TODO: remove the wrapper when every clients have moved to use the clientset.
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"})
controller := &HorizontalController{
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
}
hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
hpa := obj.(*autoscaling.HorizontalPodAutoscaler)
@ -108,31 +117,24 @@ func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (
},
// We are not interested in deletions.
},
resyncPeriod,
)
}
func NewHorizontalController(evtNamespacer v1core.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, replicaCalc *ReplicaCalculator, resyncPeriod time.Duration) *HorizontalController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"})
controller := &HorizontalController{
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
}
store, frameworkController := newInformer(controller, resyncPeriod)
controller.store = store
controller.controller = frameworkController
controller.hpaLister = hpaInformer.Lister()
controller.hpaListerSynced = hpaInformer.Informer().HasSynced
return controller
}
func (a *HorizontalController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting HPA Controller")
go a.controller.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, a.hpaListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
<-stopCh
glog.Infof("Shutting down HPA Controller")
}
@ -412,11 +414,18 @@ func shouldScale(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desi
func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas int32) {
err := a.updateStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentCPUUtilizationPercentage, hpa.Annotations[HpaCustomMetricsStatusAnnotationName], false)
if err != nil {
glog.Errorf("%v", err)
utilruntime.HandleError(err)
}
}
func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, cpuCurrentUtilization *int32, cmStatus string, rescale bool) error {
// Make a copy so we don't mutate the object in the shared cache
copy, err := api.Scheme.DeepCopy(hpa)
if err != nil {
return nil
}
hpa = copy.(*autoscaling.HorizontalPodAutoscaler)
hpa.Status = autoscaling.HorizontalPodAutoscalerStatus{
CurrentReplicas: currentReplicas,
DesiredReplicas: desiredReplicas,
@ -432,7 +441,7 @@ func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutosc
hpa.Status.LastScaleTime = &now
}
_, err := a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(hpa)
_, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(hpa)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error())
return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)

View File

@ -32,17 +32,16 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientfake "k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
heapster "k8s.io/heapster/metrics/api/v1/types"
@ -54,6 +53,8 @@ import (
_ "k8s.io/kubernetes/pkg/apis/extensions/install"
)
func alwaysReady() bool { return true }
func (w fakeResponseWrapper) DoRaw() ([]byte, error) {
return w.raw, nil
}
@ -468,28 +469,26 @@ func (tc *testCase) runTest(t *testing.T) {
return true, obj, nil
})
broadcaster := record.NewBroadcasterForTests(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: eventClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"})
replicaCalc := &ReplicaCalculator{
metricsClient: metricsClient,
podsGetter: testClient.Core(),
}
hpaController := &HorizontalController{
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: testClient.Extensions(),
hpaNamespacer: testClient.Autoscaling(),
}
informerFactory := informers.NewSharedInformerFactory(nil, testClient, controller.NoResyncPeriodFunc())
store, frameworkController := newInformer(hpaController, time.Minute)
hpaController.store = store
hpaController.controller = frameworkController
hpaController := NewHorizontalController(
eventClient.Core(),
testClient.Extensions(),
testClient.Autoscaling(),
replicaCalc,
informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
controller.NoResyncPeriodFunc(),
)
hpaController.hpaListerSynced = alwaysReady
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
go hpaController.Run(stop)
tc.Lock()

View File

@ -285,6 +285,7 @@ func ClusterRoles() []rbac.ClusterRole {
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(),
},
},
{

View File

@ -489,6 +489,13 @@ items:
verbs:
- list
- watch
- apiGroups:
- autoscaling
resources:
- horizontalpodautoscalers
verbs:
- list
- watch
- apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata: