diff --git a/cmd/kube-controller-manager/app/autoscaling.go b/cmd/kube-controller-manager/app/autoscaling.go index c3a42c5780e..b4942f9a2c2 100644 --- a/cmd/kube-controller-manager/app/autoscaling.go +++ b/cmd/kube-controller-manager/app/autoscaling.go @@ -44,6 +44,7 @@ func startHPAController(ctx ControllerContext) (bool, error) { hpaClient.Extensions(), hpaClient.Autoscaling(), replicaCalc, + ctx.NewInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration, ).Run(ctx.Stop) return true, nil diff --git a/pkg/controller/podautoscaler/BUILD b/pkg/controller/podautoscaler/BUILD index 14a225e0534..0a6e8c10c97 100644 --- a/pkg/controller/podautoscaler/BUILD +++ b/pkg/controller/podautoscaler/BUILD @@ -24,15 +24,15 @@ go_library( "//pkg/client/clientset_generated/clientset/typed/autoscaling/v1:go_default_library", "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library", + "//pkg/client/informers/informers_generated/autoscaling/v1:go_default_library", + "//pkg/client/listers/autoscaling/v1:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/sets", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/cache", @@ -49,7 +49,6 @@ go_test( library = ":go_default_library", tags = ["automanaged"], deps = [ - "//pkg/api:go_default_library", "//pkg/api/unversioned:go_default_library", "//pkg/api/v1:go_default_library", "//pkg/apis/autoscaling/install:go_default_library", @@ -57,6 +56,8 @@ go_test( "//pkg/apis/extensions/install:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library", + "//pkg/client/informers/informers_generated:go_default_library", + "//pkg/controller:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library", "//vendor:github.com/stretchr/testify/assert", "//vendor:github.com/stretchr/testify/require", @@ -65,11 +66,9 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/kubernetes/fake", - "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/testing", - "//vendor:k8s.io/client-go/tools/record", "//vendor:k8s.io/heapster/metrics/api/v1/types", "//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1", ], diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 27fb772eae0..dac09501e1d 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -25,9 +25,7 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" @@ -38,6 +36,8 @@ import ( extensionsv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" unversionedautoscaling "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/autoscaling/v1" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1" + autoscalinginformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/autoscaling/v1" + autoscalinglisters "k8s.io/kubernetes/pkg/client/listers/autoscaling/v1" ) const ( @@ -65,27 +65,36 @@ type HorizontalController struct { replicaCalc *ReplicaCalculator eventRecorder record.EventRecorder - // A store of HPA objects, populated by the controller. - store cache.Store - // Watches changes to all HPA objects. - controller cache.Controller + // hpaLister is able to list/get HPAs from the shared cache from the informer passed in to + // NewHorizontalController. + hpaLister autoscalinglisters.HorizontalPodAutoscalerLister + hpaListerSynced cache.InformerSynced } var downscaleForbiddenWindow = 5 * time.Minute var upscaleForbiddenWindow = 3 * time.Minute -func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, cache.Controller) { - return cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return controller.hpaNamespacer.HorizontalPodAutoscalers(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return controller.hpaNamespacer.HorizontalPodAutoscalers(metav1.NamespaceAll).Watch(options) - }, - }, - &autoscaling.HorizontalPodAutoscaler{}, - resyncPeriod, +func NewHorizontalController( + evtNamespacer v1core.EventsGetter, + scaleNamespacer unversionedextensions.ScalesGetter, + hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, + replicaCalc *ReplicaCalculator, + hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer, + resyncPeriod time.Duration, +) *HorizontalController { + broadcaster := record.NewBroadcaster() + // TODO: remove the wrapper when every clients have moved to use the clientset. + broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")}) + recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"}) + + controller := &HorizontalController{ + replicaCalc: replicaCalc, + eventRecorder: recorder, + scaleNamespacer: scaleNamespacer, + hpaNamespacer: hpaNamespacer, + } + + hpaInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { hpa := obj.(*autoscaling.HorizontalPodAutoscaler) @@ -108,31 +117,24 @@ func newInformer(controller *HorizontalController, resyncPeriod time.Duration) ( }, // We are not interested in deletions. }, + resyncPeriod, ) -} - -func NewHorizontalController(evtNamespacer v1core.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, replicaCalc *ReplicaCalculator, resyncPeriod time.Duration) *HorizontalController { - broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")}) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"}) - - controller := &HorizontalController{ - replicaCalc: replicaCalc, - eventRecorder: recorder, - scaleNamespacer: scaleNamespacer, - hpaNamespacer: hpaNamespacer, - } - store, frameworkController := newInformer(controller, resyncPeriod) - controller.store = store - controller.controller = frameworkController + controller.hpaLister = hpaInformer.Lister() + controller.hpaListerSynced = hpaInformer.Informer().HasSynced return controller } func (a *HorizontalController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + glog.Infof("Starting HPA Controller") - go a.controller.Run(stopCh) + + if !cache.WaitForCacheSync(stopCh, a.hpaListerSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + <-stopCh glog.Infof("Shutting down HPA Controller") } @@ -412,11 +414,18 @@ func shouldScale(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desi func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas int32) { err := a.updateStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentCPUUtilizationPercentage, hpa.Annotations[HpaCustomMetricsStatusAnnotationName], false) if err != nil { - glog.Errorf("%v", err) + utilruntime.HandleError(err) } } func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, cpuCurrentUtilization *int32, cmStatus string, rescale bool) error { + // Make a copy so we don't mutate the object in the shared cache + copy, err := api.Scheme.DeepCopy(hpa) + if err != nil { + return nil + } + hpa = copy.(*autoscaling.HorizontalPodAutoscaler) + hpa.Status = autoscaling.HorizontalPodAutoscalerStatus{ CurrentReplicas: currentReplicas, DesiredReplicas: desiredReplicas, @@ -432,7 +441,7 @@ func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutosc hpa.Status.LastScaleTime = &now } - _, err := a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(hpa) + _, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(hpa) if err != nil { a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error()) return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err) diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index 7c0fe0cd319..159026548fb 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -32,17 +32,16 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" clientfake "k8s.io/client-go/kubernetes/fake" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" restclient "k8s.io/client-go/rest" core "k8s.io/client-go/testing" - "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" heapster "k8s.io/heapster/metrics/api/v1/types" @@ -54,6 +53,8 @@ import ( _ "k8s.io/kubernetes/pkg/apis/extensions/install" ) +func alwaysReady() bool { return true } + func (w fakeResponseWrapper) DoRaw() ([]byte, error) { return w.raw, nil } @@ -468,28 +469,26 @@ func (tc *testCase) runTest(t *testing.T) { return true, obj, nil }) - broadcaster := record.NewBroadcasterForTests(0) - broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: eventClient.Core().Events("")}) - recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"}) - replicaCalc := &ReplicaCalculator{ metricsClient: metricsClient, podsGetter: testClient.Core(), } - hpaController := &HorizontalController{ - replicaCalc: replicaCalc, - eventRecorder: recorder, - scaleNamespacer: testClient.Extensions(), - hpaNamespacer: testClient.Autoscaling(), - } + informerFactory := informers.NewSharedInformerFactory(nil, testClient, controller.NoResyncPeriodFunc()) - store, frameworkController := newInformer(hpaController, time.Minute) - hpaController.store = store - hpaController.controller = frameworkController + hpaController := NewHorizontalController( + eventClient.Core(), + testClient.Extensions(), + testClient.Autoscaling(), + replicaCalc, + informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), + controller.NoResyncPeriodFunc(), + ) + hpaController.hpaListerSynced = alwaysReady stop := make(chan struct{}) defer close(stop) + informerFactory.Start(stop) go hpaController.Run(stop) tc.Lock() diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index 17e834efc74..e0ee757a044 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -285,6 +285,7 @@ func ClusterRoles() []rbac.ClusterRole { rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), + rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(), }, }, { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index 737a07b8a8a..99a6d8164de 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -489,6 +489,13 @@ items: verbs: - list - watch + - apiGroups: + - autoscaling + resources: + - horizontalpodautoscalers + verbs: + - list + - watch - apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: