diff --git a/pkg/controller/podautoscaler/BUILD b/pkg/controller/podautoscaler/BUILD index 65e6324f5c4..da72c1c7bc5 100644 --- a/pkg/controller/podautoscaler/BUILD +++ b/pkg/controller/podautoscaler/BUILD @@ -13,6 +13,7 @@ go_library( srcs = [ "doc.go", "horizontal.go", + "rate_limitters.go", "replica_calculator.go", ], tags = ["automanaged"], @@ -27,8 +28,10 @@ go_library( "//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library", "//pkg/client/informers/informers_generated/externalversions/autoscaling/v1:go_default_library", "//pkg/client/listers/autoscaling/v1:go_default_library", + "//pkg/controller:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library", "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", @@ -36,10 +39,12 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/sets", + "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/record", + "//vendor:k8s.io/client-go/util/workqueue", ], ) diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 8d3cde79fda..7cb334762ca 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -22,16 +22,19 @@ import ( "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" autoscalingv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1" @@ -41,6 +44,7 @@ import ( extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1" autoscalinginformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/autoscaling/v1" autoscalinglisters "k8s.io/kubernetes/pkg/client/listers/autoscaling/v1" + "k8s.io/kubernetes/pkg/controller" ) const ( @@ -85,6 +89,9 @@ type HorizontalController struct { // NewHorizontalController. hpaLister autoscalinglisters.HorizontalPodAutoscalerLister hpaListerSynced cache.InformerSynced + + // Controllers that need to be synced + queue workqueue.RateLimitingInterface } var downscaleForbiddenWindow = 5 * time.Minute @@ -103,41 +110,31 @@ func NewHorizontalController( broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")}) recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"}) - controller := &HorizontalController{ + hpaController := &HorizontalController{ replicaCalc: replicaCalc, eventRecorder: recorder, scaleNamespacer: scaleNamespacer, hpaNamespacer: hpaNamespacer, + queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"), } hpaInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler) - err := controller.reconcileAutoscaler(hpa) - if err != nil { - glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err) - } - }, - UpdateFunc: func(old, cur interface{}) { - hpa := cur.(*autoscalingv1.HorizontalPodAutoscaler) - err := controller.reconcileAutoscaler(hpa) - if err != nil { - glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err) - } - }, - // We are not interested in deletions. + AddFunc: hpaController.enqueueHPA, + UpdateFunc: hpaController.updateHPA, + DeleteFunc: hpaController.deleteHPA, }, resyncPeriod, ) - controller.hpaLister = hpaInformer.Lister() - controller.hpaListerSynced = hpaInformer.Informer().HasSynced + hpaController.hpaLister = hpaInformer.Lister() + hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced - return controller + return hpaController } func (a *HorizontalController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer a.queue.ShutDown() glog.Infof("Starting HPA Controller") @@ -146,10 +143,65 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) { return } + // start a single worker (we may wish to start more in the future) + go wait.Until(a.worker, time.Second, stopCh) + <-stopCh glog.Infof("Shutting down HPA Controller") } +// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item. +func (a *HorizontalController) updateHPA(old, cur interface{}) { + a.enqueueHPA(cur) +} + +// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item. +func (a *HorizontalController) enqueueHPA(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + + // always add rate-limitted so we don't fetch metrics more that once per resync interval + a.queue.AddRateLimited(key) +} + +func (a *HorizontalController) deleteHPA(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + + // TODO: could we leak if we fail to get the key? + a.queue.Forget(key) +} + +func (a *HorizontalController) worker() { + for a.processNextWorkItem() { + } + glog.Infof("horizontal pod autoscaler controller worker shutting down") +} + +func (a *HorizontalController) processNextWorkItem() bool { + key, quit := a.queue.Get() + if quit { + return false + } + defer a.queue.Done(key) + + err := a.reconcileKey(key.(string)) + if err == nil { + // don't "forget" here because we want to only process a given HPA once per resync interval + return true + } + + a.queue.AddRateLimited(key) + utilruntime.HandleError(err) + return true +} + // Computes the desired number of replicas for the metric specifications listed in the HPA, returning the maximum // of the computed replica counts, a description of the associated metric, and the statuses of all metrics // computed. @@ -275,6 +327,21 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori return replicas, metric, statuses, timestamp, nil } +func (a *HorizontalController) reconcileKey(key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name) + if errors.IsNotFound(err) { + glog.Infof("Horizontal Pod Autoscaler has been deleted %v", key) + return nil + } + + return a.reconcileAutoscaler(hpa) +} + func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler) error { // make a copy so that we never mutate the shared informer cache (conversion can mutate the object) hpav1Raw, err := api.Scheme.DeepCopy(hpav1Shared) diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index 9a7a88d2dad..62354f5c2cb 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -490,7 +490,7 @@ func (tc *testCase) runTest(t *testing.T) { ) eventClient := &clientfake.Clientset{} - eventClient.AddReactor("*", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) { + eventClient.AddReactor("create", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) { tc.Lock() defer tc.Unlock() diff --git a/pkg/controller/podautoscaler/rate_limitters.go b/pkg/controller/podautoscaler/rate_limitters.go new file mode 100644 index 00000000000..86dd0c09891 --- /dev/null +++ b/pkg/controller/podautoscaler/rate_limitters.go @@ -0,0 +1,53 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podautoscaler + +import ( + "time" + + "k8s.io/client-go/util/workqueue" +) + +// FixedItemIntervalRateLimiter limits items to a fixed-rate interval +type FixedItemIntervalRateLimiter struct { + interval time.Duration +} + +var _ workqueue.RateLimiter = &FixedItemIntervalRateLimiter{} + +func NewFixedItemIntervalRateLimiter(interval time.Duration) workqueue.RateLimiter { + return &FixedItemIntervalRateLimiter{ + interval: interval, + } +} + +func (r *FixedItemIntervalRateLimiter) When(item interface{}) time.Duration { + return r.interval +} + +func (r *FixedItemIntervalRateLimiter) NumRequeues(item interface{}) int { + return 1 +} + +func (r *FixedItemIntervalRateLimiter) Forget(item interface{}) { +} + +// NewDefaultHPARateLimitter creates a rate limitter which limits overall (as per the +// default controller rate limiter), as well as per the resync interval +func NewDefaultHPARateLimiter(interval time.Duration) workqueue.RateLimiter { + return NewFixedItemIntervalRateLimiter(interval) +}