Merge pull request #41214 from ncdc/shared-informers-06-hpa

Automatic merge from submit-queue (batch tested with PRs 41248, 41214)

Switch hpa controller to shared informer

**What this PR does / why we need it**: switch the hpa controller to use a shared informer

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

**Special notes for your reviewer**: Only the last commit is relevant. The others are from #40759, #41114, #41148  

**Release note**:

```release-note
```

cc @smarterclayton @deads2k @sttts @liggitt @DirectXMan12 @timothysc @kubernetes/sig-scalability-pr-reviews @jszczepkowski @mwielgus @piosz
This commit is contained in:
Kubernetes Submit Queue 2017-02-10 10:03:44 -08:00 committed by GitHub
commit 03bde62666
11 changed files with 259 additions and 239 deletions

View File

@ -44,6 +44,7 @@ func startHPAController(ctx ControllerContext) (bool, error) {
hpaClient.Extensions(), hpaClient.Extensions(),
hpaClient.Autoscaling(), hpaClient.Autoscaling(),
replicaCalc, replicaCalc,
ctx.NewInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration, ctx.Options.HorizontalPodAutoscalerSyncPeriod.Duration,
).Run(ctx.Stop) ).Run(ctx.Stop)
return true, nil return true, nil

View File

@ -30,7 +30,12 @@ func startDisruptionController(ctx ControllerContext) (bool, error) {
return false, nil return false, nil
} }
go disruption.NewDisruptionController( go disruption.NewDisruptionController(
ctx.InformerFactory.Pods().Informer(), ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
ctx.NewInformerFactory.Core().V1().ReplicationControllers(),
ctx.NewInformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.NewInformerFactory.Extensions().V1beta1().Deployments(),
ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.ClientBuilder.ClientOrDie("disruption-controller"), ctx.ClientBuilder.ClientOrDie("disruption-controller"),
).Run(ctx.Stop) ).Run(ctx.Stop)
return true, nil return true, nil

View File

@ -15,21 +15,25 @@ go_library(
deps = [ deps = [
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library", "//pkg/apis/policy/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/policy/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset/typed/policy/v1beta1:go_default_library",
"//pkg/client/legacylisters:go_default_library", "//pkg/client/informers/informers_generated/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/extensions/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/policy/v1beta1:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//pkg/client/listers/policy/v1beta1:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",
@ -49,14 +53,12 @@ go_test(
"//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library", "//pkg/apis/policy/v1beta1:go_default_library",
"//pkg/client/legacylisters:go_default_library", "//pkg/client/informers/informers_generated:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/workqueue", "//vendor:k8s.io/client-go/util/workqueue",
], ],
) )

View File

@ -21,13 +21,12 @@ import (
"reflect" "reflect"
"time" "time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1" clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -35,12 +34,17 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1" policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
policyclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/policy/v1beta1" policyclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/legacylisters" appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/extensions/v1beta1"
policyinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/policy/v1beta1"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
policylisters "k8s.io/kubernetes/pkg/client/listers/policy/v1beta1"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog" "github.com/golang/glog"
@ -64,28 +68,23 @@ type updater func(*policy.PodDisruptionBudget) error
type DisruptionController struct { type DisruptionController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
pdbStore cache.Store pdbLister policylisters.PodDisruptionBudgetLister
pdbController cache.Controller pdbListerSynced cache.InformerSynced
pdbLister listers.StoreToPodDisruptionBudgetLister
podController cache.Controller podLister corelisters.PodLister
podLister listers.StoreToPodLister podListerSynced cache.InformerSynced
rcIndexer cache.Indexer rcLister corelisters.ReplicationControllerLister
rcController cache.Controller rcListerSynced cache.InformerSynced
rcLister listers.StoreToReplicationControllerLister
rsStore cache.Store rsLister extensionslisters.ReplicaSetLister
rsController cache.Controller rsListerSynced cache.InformerSynced
rsLister listers.StoreToReplicaSetLister
dIndexer cache.Indexer dLister extensionslisters.DeploymentLister
dController cache.Controller dListerSynced cache.InformerSynced
dLister listers.StoreToDeploymentLister
ssStore cache.Store ssLister appslisters.StatefulSetLister
ssController cache.Controller ssListerSynced cache.InformerSynced
ssLister listers.StoreToStatefulSetLister
// PodDisruptionBudget keys that need to be synced. // PodDisruptionBudget keys that need to be synced.
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
@ -108,10 +107,17 @@ type controllerAndScale struct {
// controllers and their scale. // controllers and their scale.
type podControllerFinder func(*v1.Pod) ([]controllerAndScale, error) type podControllerFinder func(*v1.Pod) ([]controllerAndScale, error)
func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface) *DisruptionController { func NewDisruptionController(
podInformer coreinformers.PodInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
rcInformer coreinformers.ReplicationControllerInformer,
rsInformer extensionsinformers.ReplicaSetInformer,
dInformer extensionsinformers.DeploymentInformer,
ssInformer appsinformers.StatefulSetInformer,
kubeClient clientset.Interface,
) *DisruptionController {
dc := &DisruptionController{ dc := &DisruptionController{
kubeClient: kubeClient, kubeClient: kubeClient,
podController: podInformer.GetController(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"), recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"),
broadcaster: record.NewBroadcaster(), broadcaster: record.NewBroadcaster(),
@ -120,96 +126,36 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient c
dc.getUpdater = func() updater { return dc.writePdbStatus } dc.getUpdater = func() updater { return dc.writePdbStatus }
dc.podLister.Indexer = podInformer.GetIndexer() podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addPod, AddFunc: dc.addPod,
UpdateFunc: dc.updatePod, UpdateFunc: dc.updatePod,
DeleteFunc: dc.deletePod, DeleteFunc: dc.deletePod,
}) })
dc.podLister = podInformer.Lister()
dc.podListerSynced = podInformer.Informer().HasSynced
dc.pdbStore, dc.pdbController = cache.NewInformer( pdbInformer.Informer().AddEventHandlerWithResyncPeriod(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Policy().PodDisruptionBudgets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Policy().PodDisruptionBudgets(metav1.NamespaceAll).Watch(options)
},
},
&policy.PodDisruptionBudget{},
30*time.Second,
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDb, AddFunc: dc.addDb,
UpdateFunc: dc.updateDb, UpdateFunc: dc.updateDb,
DeleteFunc: dc.removeDb, DeleteFunc: dc.removeDb,
}, },
)
dc.pdbLister.Store = dc.pdbStore
dc.rcIndexer, dc.rcController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Core().ReplicationControllers(metav1.NamespaceAll).Watch(options)
},
},
&v1.ReplicationController{},
30*time.Second, 30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
) )
dc.pdbLister = pdbInformer.Lister()
dc.pdbListerSynced = pdbInformer.Informer().HasSynced
dc.rcLister.Indexer = dc.rcIndexer dc.rcLister = rcInformer.Lister()
dc.rcListerSynced = rcInformer.Informer().HasSynced
dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer( dc.rsLister = rsInformer.Lister()
&cache.ListWatch{ dc.rsListerSynced = rsInformer.Informer().HasSynced
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Extensions().ReplicaSets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Extensions().ReplicaSets(metav1.NamespaceAll).Watch(options)
},
},
&extensions.ReplicaSet{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.rsStore = dc.rsLister.Indexer
dc.dIndexer, dc.dController = cache.NewIndexerInformer( dc.dLister = dInformer.Lister()
&cache.ListWatch{ dc.dListerSynced = dInformer.Informer().HasSynced
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Extensions().Deployments(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Extensions().Deployments(metav1.NamespaceAll).Watch(options)
},
},
&extensions.Deployment{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.dLister.Indexer = dc.dIndexer
dc.ssStore, dc.ssController = cache.NewInformer( dc.ssLister = ssInformer.Lister()
&cache.ListWatch{ dc.ssListerSynced = ssInformer.Informer().HasSynced
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return dc.kubeClient.Apps().StatefulSets(metav1.NamespaceAll).Watch(options)
},
},
&apps.StatefulSet{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
)
dc.ssLister.Store = dc.ssStore
return dc return dc
} }
@ -317,19 +263,22 @@ func (dc *DisruptionController) getPodReplicationControllers(pod *v1.Pod) ([]con
} }
func (dc *DisruptionController) Run(stopCh <-chan struct{}) { func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
glog.V(0).Infof("Starting disruption controller") glog.V(0).Infof("Starting disruption controller")
if !cache.WaitForCacheSync(stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
if dc.kubeClient != nil { if dc.kubeClient != nil {
glog.V(0).Infof("Sending events to api server.") glog.V(0).Infof("Sending events to api server.")
dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(dc.kubeClient.Core().RESTClient()).Events("")}) dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(dc.kubeClient.Core().RESTClient()).Events("")})
} else { } else {
glog.V(0).Infof("No api server defined - no events will be sent to API server.") glog.V(0).Infof("No api server defined - no events will be sent to API server.")
} }
go dc.pdbController.Run(stopCh)
go dc.podController.Run(stopCh)
go dc.rcController.Run(stopCh)
go dc.rsController.Run(stopCh)
go dc.dController.Run(stopCh)
go dc.ssController.Run(stopCh)
go wait.Until(dc.worker, time.Second, stopCh) go wait.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.recheckWorker, time.Second, stopCh) go wait.Until(dc.recheckWorker, time.Second, stopCh)
@ -442,7 +391,7 @@ func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionB
glog.Warning(msg) glog.Warning(msg)
dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg) dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
} }
return &pdbs[0] return pdbs[0]
} }
func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) { func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) {
@ -510,8 +459,12 @@ func (dc *DisruptionController) sync(key string) error {
glog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Now().Sub(startTime)) glog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Now().Sub(startTime))
}() }()
obj, exists, err := dc.pdbLister.Store.GetByKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if !exists { if err != nil {
return err
}
pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(4).Infof("PodDisruptionBudget %q has been deleted", key) glog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
return nil return nil
} }
@ -519,8 +472,6 @@ func (dc *DisruptionController) sync(key string) error {
return err return err
} }
pdb := obj.(*policy.PodDisruptionBudget)
if err := dc.trySync(pdb); err != nil { if err := dc.trySync(pdb); err != nil {
glog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) glog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
return dc.failSafe(pdb) return dc.failSafe(pdb)

View File

@ -26,21 +26,21 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1" policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/legacylisters" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
type pdbStates map[string]policy.PodDisruptionBudget type pdbStates map[string]policy.PodDisruptionBudget
var alwaysReady = func() bool { return true }
func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error { func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error {
key, err := controller.KeyFunc(pdb) key, err := controller.KeyFunc(pdb)
if err != nil { if err != nil {
@ -85,23 +85,48 @@ func (ps *pdbStates) VerifyDisruptionAllowed(t *testing.T, key string, disruptio
} }
} }
func newFakeDisruptionController() (*DisruptionController, *pdbStates) { type disruptionController struct {
*DisruptionController
podStore cache.Store
pdbStore cache.Store
rcStore cache.Store
rsStore cache.Store
dStore cache.Store
ssStore cache.Store
}
func newFakeDisruptionController() (*disruptionController, *pdbStates) {
ps := &pdbStates{} ps := &pdbStates{}
dc := &DisruptionController{ informerFactory := informers.NewSharedInformerFactory(nil, nil, controller.NoResyncPeriodFunc())
pdbLister: listers.StoreToPodDisruptionBudgetLister{Store: cache.NewStore(controller.KeyFunc)},
podLister: listers.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
rcLister: listers.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
rsLister: listers.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
dLister: listers.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
ssLister: listers.StoreToStatefulSetLister{Store: cache.NewStore(controller.KeyFunc)},
getUpdater: func() updater { return ps.Set },
broadcaster: record.NewBroadcaster(),
}
dc.recorder = dc.broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "disruption_test"}) dc := NewDisruptionController(
informerFactory.Core().V1().Pods(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Extensions().V1beta1().Deployments(),
informerFactory.Apps().V1beta1().StatefulSets(),
nil,
)
dc.getUpdater = func() updater { return ps.Set }
dc.podListerSynced = alwaysReady
dc.pdbListerSynced = alwaysReady
dc.rcListerSynced = alwaysReady
dc.rsListerSynced = alwaysReady
dc.dListerSynced = alwaysReady
dc.ssListerSynced = alwaysReady
return dc, ps return &disruptionController{
dc,
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Informer().GetStore(),
informerFactory.Core().V1().ReplicationControllers().Informer().GetStore(),
informerFactory.Extensions().V1beta1().ReplicaSets().Informer().GetStore(),
informerFactory.Extensions().V1beta1().Deployments().Informer().GetStore(),
informerFactory.Apps().V1beta1().StatefulSets().Informer().GetStore(),
}, ps
} }
func fooBar() map[string]string { func fooBar() map[string]string {
@ -283,11 +308,11 @@ func TestNoSelector(t *testing.T) {
pdb.Spec.Selector = &metav1.LabelSelector{} pdb.Spec.Selector = &metav1.LabelSelector{}
pod, _ := newPod(t, "yo-yo-yo") pod, _ := newPod(t, "yo-yo-yo")
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{})
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{})
} }
@ -298,7 +323,7 @@ func TestUnavailable(t *testing.T) {
dc, ps := newFakeDisruptionController() dc, ps := newFakeDisruptionController()
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(3)) pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(3))
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) dc.sync(pdbName)
// Add three pods, verifying that the counts go up at each step. // Add three pods, verifying that the counts go up at each step.
@ -307,14 +332,14 @@ func TestUnavailable(t *testing.T) {
ps.VerifyPdbStatus(t, pdbName, 0, i, 3, i, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, i, 3, i, map[string]metav1.Time{})
pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i)) pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i))
pods = append(pods, pod) pods = append(pods, pod)
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(pdbName)
} }
ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{})
// Now set one pod as unavailable // Now set one pod as unavailable
pods[0].Status.Conditions = []v1.PodCondition{} pods[0].Status.Conditions = []v1.PodCondition{}
update(t, dc.podLister.Indexer, pods[0]) update(t, dc.podStore, pods[0])
dc.sync(pdbName) dc.sync(pdbName)
// Verify expected update // Verify expected update
@ -327,13 +352,13 @@ func TestNakedPod(t *testing.T) {
dc, ps := newFakeDisruptionController() dc, ps := newFakeDisruptionController()
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
dc.sync(pdbName) dc.sync(pdbName)
// This verifies that when a PDB has 0 pods, disruptions are not allowed. // This verifies that when a PDB has 0 pods, disruptions are not allowed.
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
pod, _ := newPod(t, "naked") pod, _ := newPod(t, "naked")
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
@ -344,13 +369,13 @@ func TestReplicaSet(t *testing.T) {
dc, ps := newFakeDisruptionController() dc, ps := newFakeDisruptionController()
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("20%")) pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("20%"))
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
rs, _ := newReplicaSet(t, 10) rs, _ := newReplicaSet(t, 10)
add(t, dc.rsLister.Indexer, rs) add(t, dc.rsStore, rs)
pod, _ := newPod(t, "pod") pod, _ := newPod(t, "pod")
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{})
} }
@ -363,11 +388,11 @@ func TestMultipleControllers(t *testing.T) {
dc, ps := newFakeDisruptionController() dc, ps := newFakeDisruptionController()
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("1%")) pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("1%"))
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
for i := 0; i < podCount; i++ { for i := 0; i < podCount; i++ {
pod, _ := newPod(t, fmt.Sprintf("pod %d", i)) pod, _ := newPod(t, fmt.Sprintf("pod %d", i))
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
} }
dc.sync(pdbName) dc.sync(pdbName)
@ -376,7 +401,7 @@ func TestMultipleControllers(t *testing.T) {
rc, _ := newReplicationController(t, 1) rc, _ := newReplicationController(t, 1)
rc.Name = "rc 1" rc.Name = "rc 1"
add(t, dc.rcLister.Indexer, rc) add(t, dc.rcStore, rc)
dc.sync(pdbName) dc.sync(pdbName)
// One RC and 200%>1% healthy => disruption allowed // One RC and 200%>1% healthy => disruption allowed
@ -384,7 +409,7 @@ func TestMultipleControllers(t *testing.T) {
rc, _ = newReplicationController(t, 1) rc, _ = newReplicationController(t, 1)
rc.Name = "rc 2" rc.Name = "rc 2"
add(t, dc.rcLister.Indexer, rc) add(t, dc.rcStore, rc)
dc.sync(pdbName) dc.sync(pdbName)
// 100%>1% healthy BUT two RCs => no disruption allowed // 100%>1% healthy BUT two RCs => no disruption allowed
@ -405,10 +430,10 @@ func TestReplicationController(t *testing.T) {
// 34% should round up to 2 // 34% should round up to 2
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%")) pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%"))
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
rc, _ := newReplicationController(t, 3) rc, _ := newReplicationController(t, 3)
rc.Spec.Selector = labels rc.Spec.Selector = labels
add(t, dc.rcLister.Indexer, rc) add(t, dc.rcStore, rc)
dc.sync(pdbName) dc.sync(pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know // It starts out at 0 expected because, with no pods, the PDB doesn't know
@ -421,7 +446,7 @@ func TestReplicationController(t *testing.T) {
pod, _ := newPod(t, fmt.Sprintf("foobar %d", i)) pod, _ := newPod(t, fmt.Sprintf("foobar %d", i))
pods = append(pods, pod) pods = append(pods, pod)
pod.Labels = labels pod.Labels = labels
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(pdbName)
if i < 2 { if i < 2 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{})
@ -431,7 +456,7 @@ func TestReplicationController(t *testing.T) {
} }
rogue, _ := newPod(t, "rogue") rogue, _ := newPod(t, "rogue")
add(t, dc.podLister.Indexer, rogue) add(t, dc.podStore, rogue)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyDisruptionAllowed(t, pdbName, 0) ps.VerifyDisruptionAllowed(t, pdbName, 0)
} }
@ -446,9 +471,9 @@ func TestStatefulSetController(t *testing.T) {
// 34% should round up to 2 // 34% should round up to 2
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%")) pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%"))
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
ss, _ := newStatefulSet(t, 3) ss, _ := newStatefulSet(t, 3)
add(t, dc.ssLister.Store, ss) add(t, dc.ssStore, ss)
dc.sync(pdbName) dc.sync(pdbName)
// It starts out at 0 expected because, with no pods, the PDB doesn't know // It starts out at 0 expected because, with no pods, the PDB doesn't know
@ -461,7 +486,7 @@ func TestStatefulSetController(t *testing.T) {
pod, _ := newPod(t, fmt.Sprintf("foobar %d", i)) pod, _ := newPod(t, fmt.Sprintf("foobar %d", i))
pods = append(pods, pod) pods = append(pods, pod)
pod.Labels = labels pod.Labels = labels
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(pdbName)
if i < 2 { if i < 2 {
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{})
@ -494,10 +519,10 @@ func TestTwoControllers(t *testing.T) {
const minimumTwo int32 = 7 // integer minimum with two controllers const minimumTwo int32 = 7 // integer minimum with two controllers
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%")) pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("28%"))
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
rc, _ := newReplicationController(t, collectionSize) rc, _ := newReplicationController(t, collectionSize)
rc.Spec.Selector = rcLabels rc.Spec.Selector = rcLabels
add(t, dc.rcLister.Indexer, rc) add(t, dc.rcStore, rc)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})
@ -512,7 +537,7 @@ func TestTwoControllers(t *testing.T) {
if i <= unavailablePods { if i <= unavailablePods {
pod.Status.Conditions = []v1.PodCondition{} pod.Status.Conditions = []v1.PodCondition{}
} }
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(pdbName)
if i <= unavailablePods { if i <= unavailablePods {
ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]metav1.Time{})
@ -525,14 +550,14 @@ func TestTwoControllers(t *testing.T) {
d, _ := newDeployment(t, collectionSize) d, _ := newDeployment(t, collectionSize)
d.Spec.Selector = newSel(dLabels) d.Spec.Selector = newSel(dLabels)
add(t, dc.dLister.Indexer, d) add(t, dc.dStore, d)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{})
rs, _ := newReplicaSet(t, collectionSize) rs, _ := newReplicaSet(t, collectionSize)
rs.Spec.Selector = newSel(dLabels) rs.Spec.Selector = newSel(dLabels)
rs.Labels = dLabels rs.Labels = dLabels
add(t, dc.rsLister.Indexer, rs) add(t, dc.rsStore, rs)
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{})
@ -545,7 +570,7 @@ func TestTwoControllers(t *testing.T) {
if i <= unavailablePods { if i <= unavailablePods {
pod.Status.Conditions = []v1.PodCondition{} pod.Status.Conditions = []v1.PodCondition{}
} }
add(t, dc.podLister.Indexer, pod) add(t, dc.podStore, pod)
dc.sync(pdbName) dc.sync(pdbName)
if i <= unavailablePods { if i <= unavailablePods {
ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
@ -562,17 +587,17 @@ func TestTwoControllers(t *testing.T) {
// verify that a disruption is permitted again. // verify that a disruption is permitted again.
ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-1].Status.Conditions = []v1.PodCondition{} pods[collectionSize-1].Status.Conditions = []v1.PodCondition{}
update(t, dc.podLister.Indexer, pods[collectionSize-1]) update(t, dc.podStore, pods[collectionSize-1])
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-2].Status.Conditions = []v1.PodCondition{} pods[collectionSize-2].Status.Conditions = []v1.PodCondition{}
update(t, dc.podLister.Indexer, pods[collectionSize-2]) update(t, dc.podStore, pods[collectionSize-2])
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
pods[collectionSize-1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} pods[collectionSize-1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
update(t, dc.podLister.Indexer, pods[collectionSize-1]) update(t, dc.podStore, pods[collectionSize-1])
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{})
} }
@ -581,7 +606,7 @@ func TestTwoControllers(t *testing.T) {
func TestPDBNotExist(t *testing.T) { func TestPDBNotExist(t *testing.T) {
dc, _ := newFakeDisruptionController() dc, _ := newFakeDisruptionController()
pdb, _ := newPodDisruptionBudget(t, intstr.FromString("67%")) pdb, _ := newPodDisruptionBudget(t, intstr.FromString("67%"))
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
if err := dc.sync("notExist"); err != nil { if err := dc.sync("notExist"); err != nil {
t.Errorf("Unexpected error: %v, expect nil", err) t.Errorf("Unexpected error: %v, expect nil", err)
} }
@ -598,16 +623,16 @@ func TestUpdateDisruptedPods(t *testing.T) {
"p3": {Time: currentTime}, // Should remain, pod untouched. "p3": {Time: currentTime}, // Should remain, pod untouched.
"notthere": {Time: currentTime}, // Should be removed, pod deleted. "notthere": {Time: currentTime}, // Should be removed, pod deleted.
} }
add(t, dc.pdbLister.Store, pdb) add(t, dc.pdbStore, pdb)
pod1, _ := newPod(t, "p1") pod1, _ := newPod(t, "p1")
pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()} pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()}
pod2, _ := newPod(t, "p2") pod2, _ := newPod(t, "p2")
pod3, _ := newPod(t, "p3") pod3, _ := newPod(t, "p3")
add(t, dc.podLister.Indexer, pod1) add(t, dc.podStore, pod1)
add(t, dc.podLister.Indexer, pod2) add(t, dc.podStore, pod2)
add(t, dc.podLister.Indexer, pod3) add(t, dc.podStore, pod3)
dc.sync(pdbName) dc.sync(pdbName)

View File

@ -24,15 +24,15 @@ go_library(
"//pkg/client/clientset_generated/clientset/typed/autoscaling/v1:go_default_library", "//pkg/client/clientset_generated/clientset/typed/autoscaling/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library", "//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library", "//pkg/client/clientset_generated/clientset/typed/extensions/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/autoscaling/v1:go_default_library",
"//pkg/client/listers/autoscaling/v1:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",
@ -49,7 +49,6 @@ go_test(
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/apis/autoscaling/install:go_default_library", "//pkg/apis/autoscaling/install:go_default_library",
@ -57,6 +56,8 @@ go_test(
"//pkg/apis/extensions/install:go_default_library", "//pkg/apis/extensions/install:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library", "//pkg/controller/podautoscaler/metrics:go_default_library",
"//vendor:github.com/stretchr/testify/assert", "//vendor:github.com/stretchr/testify/assert",
"//vendor:github.com/stretchr/testify/require", "//vendor:github.com/stretchr/testify/require",
@ -65,11 +66,9 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/fake", "//vendor:k8s.io/client-go/kubernetes/fake",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/heapster/metrics/api/v1/types", "//vendor:k8s.io/heapster/metrics/api/v1/types",
"//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1", "//vendor:k8s.io/heapster/metrics/apis/metrics/v1alpha1",
], ],

View File

@ -25,9 +25,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1" clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -38,6 +36,8 @@ import (
extensionsv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensionsv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
unversionedautoscaling "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/autoscaling/v1" unversionedautoscaling "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/autoscaling/v1"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1"
autoscalinginformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/autoscaling/v1"
autoscalinglisters "k8s.io/kubernetes/pkg/client/listers/autoscaling/v1"
) )
const ( const (
@ -65,27 +65,36 @@ type HorizontalController struct {
replicaCalc *ReplicaCalculator replicaCalc *ReplicaCalculator
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
// A store of HPA objects, populated by the controller. // hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
store cache.Store // NewHorizontalController.
// Watches changes to all HPA objects. hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
controller cache.Controller hpaListerSynced cache.InformerSynced
} }
var downscaleForbiddenWindow = 5 * time.Minute var downscaleForbiddenWindow = 5 * time.Minute
var upscaleForbiddenWindow = 3 * time.Minute var upscaleForbiddenWindow = 3 * time.Minute
func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, cache.Controller) { func NewHorizontalController(
return cache.NewInformer( evtNamespacer v1core.EventsGetter,
&cache.ListWatch{ scaleNamespacer unversionedextensions.ScalesGetter,
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter,
return controller.hpaNamespacer.HorizontalPodAutoscalers(metav1.NamespaceAll).List(options) replicaCalc *ReplicaCalculator,
}, hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { resyncPeriod time.Duration,
return controller.hpaNamespacer.HorizontalPodAutoscalers(metav1.NamespaceAll).Watch(options) ) *HorizontalController {
}, broadcaster := record.NewBroadcaster()
}, // TODO: remove the wrapper when every clients have moved to use the clientset.
&autoscaling.HorizontalPodAutoscaler{}, broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
resyncPeriod, recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"})
controller := &HorizontalController{
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
}
hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
hpa := obj.(*autoscaling.HorizontalPodAutoscaler) hpa := obj.(*autoscaling.HorizontalPodAutoscaler)
@ -108,31 +117,24 @@ func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (
}, },
// We are not interested in deletions. // We are not interested in deletions.
}, },
resyncPeriod,
) )
} controller.hpaLister = hpaInformer.Lister()
controller.hpaListerSynced = hpaInformer.Informer().HasSynced
func NewHorizontalController(evtNamespacer v1core.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, replicaCalc *ReplicaCalculator, resyncPeriod time.Duration) *HorizontalController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"})
controller := &HorizontalController{
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: scaleNamespacer,
hpaNamespacer: hpaNamespacer,
}
store, frameworkController := newInformer(controller, resyncPeriod)
controller.store = store
controller.controller = frameworkController
return controller return controller
} }
func (a *HorizontalController) Run(stopCh <-chan struct{}) { func (a *HorizontalController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
glog.Infof("Starting HPA Controller") glog.Infof("Starting HPA Controller")
go a.controller.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, a.hpaListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
<-stopCh <-stopCh
glog.Infof("Shutting down HPA Controller") glog.Infof("Shutting down HPA Controller")
} }
@ -412,11 +414,18 @@ func shouldScale(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desi
func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas int32) { func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas int32) {
err := a.updateStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentCPUUtilizationPercentage, hpa.Annotations[HpaCustomMetricsStatusAnnotationName], false) err := a.updateStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentCPUUtilizationPercentage, hpa.Annotations[HpaCustomMetricsStatusAnnotationName], false)
if err != nil { if err != nil {
glog.Errorf("%v", err) utilruntime.HandleError(err)
} }
} }
func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, cpuCurrentUtilization *int32, cmStatus string, rescale bool) error { func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, cpuCurrentUtilization *int32, cmStatus string, rescale bool) error {
// Make a copy so we don't mutate the object in the shared cache
copy, err := api.Scheme.DeepCopy(hpa)
if err != nil {
return nil
}
hpa = copy.(*autoscaling.HorizontalPodAutoscaler)
hpa.Status = autoscaling.HorizontalPodAutoscalerStatus{ hpa.Status = autoscaling.HorizontalPodAutoscalerStatus{
CurrentReplicas: currentReplicas, CurrentReplicas: currentReplicas,
DesiredReplicas: desiredReplicas, DesiredReplicas: desiredReplicas,
@ -432,7 +441,7 @@ func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutosc
hpa.Status.LastScaleTime = &now hpa.Status.LastScaleTime = &now
} }
_, err := a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(hpa) _, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(hpa)
if err != nil { if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error()) a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error())
return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err) return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)

View File

@ -32,17 +32,16 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
clientfake "k8s.io/client-go/kubernetes/fake" clientfake "k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1" clientv1 "k8s.io/client-go/pkg/api/v1"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v1" autoscaling "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
heapster "k8s.io/heapster/metrics/api/v1/types" heapster "k8s.io/heapster/metrics/api/v1/types"
@ -54,6 +53,8 @@ import (
_ "k8s.io/kubernetes/pkg/apis/extensions/install" _ "k8s.io/kubernetes/pkg/apis/extensions/install"
) )
func alwaysReady() bool { return true }
func (w fakeResponseWrapper) DoRaw() ([]byte, error) { func (w fakeResponseWrapper) DoRaw() ([]byte, error) {
return w.raw, nil return w.raw, nil
} }
@ -468,28 +469,26 @@ func (tc *testCase) runTest(t *testing.T) {
return true, obj, nil return true, obj, nil
}) })
broadcaster := record.NewBroadcasterForTests(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: eventClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "horizontal-pod-autoscaler"})
replicaCalc := &ReplicaCalculator{ replicaCalc := &ReplicaCalculator{
metricsClient: metricsClient, metricsClient: metricsClient,
podsGetter: testClient.Core(), podsGetter: testClient.Core(),
} }
hpaController := &HorizontalController{ informerFactory := informers.NewSharedInformerFactory(nil, testClient, controller.NoResyncPeriodFunc())
replicaCalc: replicaCalc,
eventRecorder: recorder,
scaleNamespacer: testClient.Extensions(),
hpaNamespacer: testClient.Autoscaling(),
}
store, frameworkController := newInformer(hpaController, time.Minute) hpaController := NewHorizontalController(
hpaController.store = store eventClient.Core(),
hpaController.controller = frameworkController testClient.Extensions(),
testClient.Autoscaling(),
replicaCalc,
informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
controller.NoResyncPeriodFunc(),
)
hpaController.hpaListerSynced = alwaysReady
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop) defer close(stop)
informerFactory.Start(stop)
go hpaController.Run(stop) go hpaController.Run(stop)
tc.Lock() tc.Lock()

View File

@ -283,6 +283,9 @@ func ClusterRoles() []rbac.ClusterRole {
"persistentvolumes", "pods", "secrets", "services", "serviceaccounts", "replicationcontrollers").RuleOrDie(), "persistentvolumes", "pods", "secrets", "services", "serviceaccounts", "replicationcontrollers").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(appsGroup).Resources("statefulsets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(),
}, },
}, },
{ {

View File

@ -475,6 +475,27 @@ items:
verbs: verbs:
- list - list
- watch - watch
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- list
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- list
- watch
- apiGroups:
- autoscaling
resources:
- horizontalpodautoscalers
verbs:
- list
- watch
- apiVersion: rbac.authorization.k8s.io/v1beta1 - apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole kind: ClusterRole
metadata: metadata:

View File

@ -36,8 +36,8 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/policy/v1beta1" "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
"k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
@ -50,14 +50,14 @@ const (
func TestConcurrentEvictionRequests(t *testing.T) { func TestConcurrentEvictionRequests(t *testing.T) {
podNameFormat := "test-pod-%d" podNameFormat := "test-pod-%d"
s, rm, podInformer, clientSet := rmSetup(t) s, rm, informers, clientSet := rmSetup(t)
defer s.Close() defer s.Close()
ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t) ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go podInformer.Run(stopCh) informers.Start(stopCh)
go rm.Run(stopCh) go rm.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -87,7 +87,7 @@ func TestConcurrentEvictionRequests(t *testing.T) {
} }
} }
waitToObservePods(t, podInformer, numOfEvictions) waitToObservePods(t, informers.Core().V1().Pods().Informer(), numOfEvictions)
pdb := newPDB() pdb := newPDB()
if _, err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil { if _, err := clientSet.Policy().PodDisruptionBudgets(ns.Name).Create(pdb); err != nil {
@ -227,7 +227,7 @@ func newEviction(ns, evictionName string, deleteOption *metav1.DeleteOptions) *v
} }
} }
func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, cache.SharedIndexInformer, clientset.Interface) { func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig() masterConfig := framework.NewIntegrationTestMasterConfig()
_, s := framework.RunAMaster(masterConfig) _, s := framework.RunAMaster(masterConfig)
@ -237,13 +237,18 @@ func rmSetup(t *testing.T) (*httptest.Server, *disruption.DisruptionController,
t.Fatalf("Error in create clientset: %v", err) t.Fatalf("Error in create clientset: %v", err)
} }
resyncPeriod := 12 * time.Hour resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), nil, resyncPeriod) informers := informers.NewSharedInformerFactory(nil, clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pdb-informers")), resyncPeriod)
rm := disruption.NewDisruptionController( rm := disruption.NewDisruptionController(
informers.Pods().Informer(), informers.Core().V1().Pods(),
informers.Policy().V1beta1().PodDisruptionBudgets(),
informers.Core().V1().ReplicationControllers(),
informers.Extensions().V1beta1().ReplicaSets(),
informers.Extensions().V1beta1().Deployments(),
informers.Apps().V1beta1().StatefulSets(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller")), clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "disruption-controller")),
) )
return s, rm, informers.Pods().Informer(), clientSet return s, rm, informers, clientSet
} }
// wait for the podInformer to observe the pods. Call this function before // wait for the podInformer to observe the pods. Call this function before