From 25fe1e0d8290b25e63670624511cdea3aabe8125 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Thu, 12 Jan 2017 21:45:53 +0800 Subject: [PATCH] Made cache.Controller to be interface. --- .../cluster/clustercontroller.go | 2 +- .../configmap/configmap_controller.go | 4 +-- .../daemonset/daemonset_controller.go | 4 +-- .../deployment/deploymentcontroller.go | 6 ++-- .../ingress/ingress_controller.go | 6 ++-- .../namespace/namespace_controller.go | 4 +-- .../replicaset/replicasetcontroller.go | 6 ++-- .../secret/secret_controller.go | 4 +-- .../service/cluster_helper.go | 4 +-- .../service/servicecontroller.go | 6 ++-- .../util/federated_informer.go | 4 +-- .../util/federated_informer_test.go | 2 +- pkg/client/cache/controller.go | 33 +++++++++---------- pkg/client/cache/shared_informer.go | 14 +++++--- .../certificates/certificate_controller.go | 2 +- pkg/controller/disruption/disruption.go | 12 +++---- .../endpoint/endpoints_controller.go | 4 +-- .../garbagecollector/garbagecollector.go | 2 +- .../namespace/namespace_controller.go | 2 +- pkg/controller/petset/pet_set.go | 4 +-- pkg/controller/podautoscaler/horizontal.go | 4 +-- pkg/controller/podgc/gc_controller.go | 2 +- pkg/controller/podgc/gc_controller_test.go | 4 +++ .../replication/replication_controller.go | 2 +- .../resourcequota/replenishment_controller.go | 9 ++--- .../resource_quota_controller.go | 6 ++-- pkg/controller/route/routecontroller.go | 2 +- pkg/controller/service/servicecontroller.go | 2 +- .../serviceaccount/tokens_controller.go | 4 +-- .../volume/persistentvolume/pv_controller.go | 4 +-- plugin/pkg/scheduler/factory/factory.go | 12 +++---- .../client-go/tools/cache/controller.go | 32 ++++++++---------- .../client-go/tools/cache/shared_informer.go | 10 ++++-- test/e2e/daemon_restart.go | 2 +- test/e2e/network_partition.go | 2 +- test/e2e_node/density_test.go | 3 +- 36 files changed, 115 insertions(+), 110 deletions(-) diff --git a/federation/pkg/federation-controller/cluster/clustercontroller.go b/federation/pkg/federation-controller/cluster/clustercontroller.go index 65c12d091cd..89c20833253 100644 --- a/federation/pkg/federation-controller/cluster/clustercontroller.go +++ b/federation/pkg/federation-controller/cluster/clustercontroller.go @@ -49,7 +49,7 @@ type ClusterController struct { clusterKubeClientMap map[string]ClusterClient // cluster framework and store - clusterController *cache.Controller + clusterController cache.Controller clusterStore clustercache.StoreToClusterLister } diff --git a/federation/pkg/federation-controller/configmap/configmap_controller.go b/federation/pkg/federation-controller/configmap/configmap_controller.go index 4676dbb908a..7424a61f1eb 100644 --- a/federation/pkg/federation-controller/configmap/configmap_controller.go +++ b/federation/pkg/federation-controller/configmap/configmap_controller.go @@ -58,7 +58,7 @@ type ConfigMapController struct { // Definitions of configmaps that should be federated. configmapInformerStore cache.Store // Informer controller for configmaps that should be federated. - configmapInformerController cache.ControllerInterface + configmapInformerController cache.Controller // Client to federated api server. federatedApiClient federationclientset.Interface @@ -112,7 +112,7 @@ func NewConfigMapController(client federationclientset.Interface) *ConfigMapCont // Federated informer on configmaps in members of federation. configmapcontroller.configmapFederatedInformer = util.NewFederatedInformer( client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federation-controller/daemonset/daemonset_controller.go b/federation/pkg/federation-controller/daemonset/daemonset_controller.go index f35361a89c4..628226aba06 100644 --- a/federation/pkg/federation-controller/daemonset/daemonset_controller.go +++ b/federation/pkg/federation-controller/daemonset/daemonset_controller.go @@ -63,7 +63,7 @@ type DaemonSetController struct { // Definitions of daemonsets that should be federated. daemonsetInformerStore cache.Store // Informer controller for daemonsets that should be federated. - daemonsetInformerController cache.ControllerInterface + daemonsetInformerController cache.Controller // Client to federated api server. federatedApiClient federationclientset.Interface @@ -119,7 +119,7 @@ func NewDaemonSetController(client federationclientset.Interface) *DaemonSetCont // Federated informer on daemonsets in members of federation. daemonsetcontroller.daemonsetFederatedInformer = util.NewFederatedInformer( client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federation-controller/deployment/deploymentcontroller.go b/federation/pkg/federation-controller/deployment/deploymentcontroller.go index e044b9402e2..d0fba0bdec1 100644 --- a/federation/pkg/federation-controller/deployment/deploymentcontroller.go +++ b/federation/pkg/federation-controller/deployment/deploymentcontroller.go @@ -80,7 +80,7 @@ func parseFederationDeploymentPreference(fd *extensionsv1.Deployment) (*fed.Fede type DeploymentController struct { fedClient fedclientset.Interface - deploymentController *cache.Controller + deploymentController cache.Controller deploymentStore cache.Store fedDeploymentInformer fedutil.FederatedInformer @@ -119,7 +119,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen eventRecorder: recorder, } - deploymentFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + deploymentFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { @@ -146,7 +146,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen } fdc.fedDeploymentInformer = fedutil.NewFederatedInformer(federationClient, deploymentFedInformerFactory, &clusterLifecycle) - podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index 1ee178b86b4..49c2302b64a 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -91,7 +91,7 @@ type IngressController struct { // Definitions of ingresses that should be federated. ingressInformerStore cache.Store // Informer controller for ingresses that should be federated. - ingressInformerController cache.ControllerInterface + ingressInformerController cache.Controller // Client to federated api server. federatedApiClient federationclientset.Interface @@ -157,7 +157,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll // Federated informer on ingresses in members of federation. ic.ingressFederatedInformer = util.NewFederatedInformer( client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { @@ -189,7 +189,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll // Federated informer on configmaps for ingress controllers in members of the federation. ic.configMapFederatedInformer = util.NewFederatedInformer( client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { glog.V(4).Infof("Returning new informer for cluster %q", cluster.Name) return cache.NewInformer( &cache.ListWatch{ diff --git a/federation/pkg/federation-controller/namespace/namespace_controller.go b/federation/pkg/federation-controller/namespace/namespace_controller.go index 756daee328f..1bad368c400 100644 --- a/federation/pkg/federation-controller/namespace/namespace_controller.go +++ b/federation/pkg/federation-controller/namespace/namespace_controller.go @@ -60,7 +60,7 @@ type NamespaceController struct { // Definitions of namespaces that should be federated. namespaceInformerStore cache.Store // Informer controller for namespaces that should be federated. - namespaceInformerController cache.ControllerInterface + namespaceInformerController cache.Controller // Client to federated api server. federatedApiClient federationclientset.Interface @@ -116,7 +116,7 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont // Federated informer on namespaces in members of federation. nc.namespaceFederatedInformer = util.NewFederatedInformer( client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { diff --git a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go index a31b16fb141..35a85e050e3 100644 --- a/federation/pkg/federation-controller/replicaset/replicasetcontroller.go +++ b/federation/pkg/federation-controller/replicaset/replicasetcontroller.go @@ -80,7 +80,7 @@ func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.Fede type ReplicaSetController struct { fedClient fedclientset.Interface - replicaSetController *cache.Controller + replicaSetController cache.Controller replicaSetStore cache.StoreToReplicaSetLister fedReplicaSetInformer fedutil.FederatedInformer @@ -121,7 +121,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe eventRecorder: recorder, } - replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { @@ -148,7 +148,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe } frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) - podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { diff --git a/federation/pkg/federation-controller/secret/secret_controller.go b/federation/pkg/federation-controller/secret/secret_controller.go index 858c819d1ee..bd48df0ccaf 100644 --- a/federation/pkg/federation-controller/secret/secret_controller.go +++ b/federation/pkg/federation-controller/secret/secret_controller.go @@ -61,7 +61,7 @@ type SecretController struct { // Definitions of secrets that should be federated. secretInformerStore cache.Store // Informer controller for secrets that should be federated. - secretInformerController cache.ControllerInterface + secretInformerController cache.Controller // Client to federated api server. federatedApiClient federationclientset.Interface @@ -117,7 +117,7 @@ func NewSecretController(client federationclientset.Interface) *SecretController // Federated informer on secrets in members of federation. secretcontroller.secretFederatedInformer = util.NewFederatedInformer( client, - func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federation-controller/service/cluster_helper.go b/federation/pkg/federation-controller/service/cluster_helper.go index ede0b1313d2..242678536c9 100644 --- a/federation/pkg/federation-controller/service/cluster_helper.go +++ b/federation/pkg/federation-controller/service/cluster_helper.go @@ -41,11 +41,11 @@ type clusterCache struct { // A store of services, populated by the serviceController serviceStore cache.StoreToServiceLister // Watches changes to all services - serviceController *cache.Controller + serviceController cache.Controller // A store of endpoint, populated by the serviceController endpointStore cache.StoreToEndpointsLister // Watches changes to all endpoints - endpointController *cache.Controller + endpointController cache.Controller // services that need to be synced serviceQueue *workqueue.Type // endpoints that need to be synced diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 9aa68e9b4d6..74f0ad4d3b2 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -121,12 +121,12 @@ type ServiceController struct { // A store of services, populated by the serviceController serviceStore cache.StoreToServiceLister // Watches changes to all services - serviceController *cache.Controller + serviceController cache.Controller federatedInformer fedutil.FederatedInformer // A store of services, populated by the serviceController clusterStore federationcache.StoreToClusterLister // Watches changes to all services - clusterController *cache.Controller + clusterController cache.Controller eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder // services that need to be synced @@ -245,7 +245,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, s.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) }, } - fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { diff --git a/federation/pkg/federation-controller/util/federated_informer.go b/federation/pkg/federation-controller/util/federated_informer.go index a99a9e3c074..2ef0beb9a35 100644 --- a/federation/pkg/federation-controller/util/federated_informer.go +++ b/federation/pkg/federation-controller/util/federated_informer.go @@ -116,7 +116,7 @@ type FederatedInformerForTestOnly interface { // A function that should be used to create an informer on the target object. Store should use // cache.DeletionHandlingMetaNamespaceKeyFunc as a keying function. -type TargetInformerFactory func(*federationapi.Cluster, kubeclientset.Interface) (cache.Store, cache.ControllerInterface) +type TargetInformerFactory func(*federationapi.Cluster, kubeclientset.Interface) (cache.Store, cache.Controller) // A structure with cluster lifecycle handler functions. Cluster is available (and ClusterAvailable is fired) // when it is created in federated etcd and ready. Cluster becomes unavailable (and ClusterUnavailable is fired) @@ -242,7 +242,7 @@ func isClusterReady(cluster *federationapi.Cluster) bool { } type informer struct { - controller cache.ControllerInterface + controller cache.Controller store cache.Store stopChan chan struct{} } diff --git a/federation/pkg/federation-controller/util/federated_informer_test.go b/federation/pkg/federation-controller/util/federated_informer_test.go index 1ab120bcc9e..8c02998c8ee 100644 --- a/federation/pkg/federation-controller/util/federated_informer_test.go +++ b/federation/pkg/federation-controller/util/federated_informer_test.go @@ -77,7 +77,7 @@ func TestFederatedInformer(t *testing.T) { return true, watch.NewFake(), nil }) - targetInformerFactory := func(cluster *federationapi.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { + targetInformerFactory := func(cluster *federationapi.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { diff --git a/pkg/client/cache/controller.go b/pkg/client/cache/controller.go index 314b7617b4f..fc756eb8f32 100644 --- a/pkg/client/cache/controller.go +++ b/pkg/client/cache/controller.go @@ -61,21 +61,21 @@ type Config struct { type ProcessFunc func(obj interface{}) error // Controller is a generic controller framework. -type Controller struct { +type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex } -// TODO make the "Controller" private, and convert all references to use ControllerInterface instead -type ControllerInterface interface { +type Controller interface { Run(stopCh <-chan struct{}) HasSynced() bool + LastSyncResourceVersion() string } // New makes a new Controller from the given Config. -func New(c *Config) *Controller { - ctlr := &Controller{ +func New(c *Config) Controller { + ctlr := &controller{ config: *c, } return ctlr @@ -84,7 +84,7 @@ func New(c *Config) *Controller { // Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. -func (c *Controller) Run(stopCh <-chan struct{}) { +func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() r := NewReflector( c.config.ListerWatcher, @@ -103,18 +103,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } // Returns true once this controller has completed an initial resource listing -func (c *Controller) HasSynced() bool { +func (c *controller) HasSynced() bool { return c.config.Queue.HasSynced() } -// Requeue adds the provided object back into the queue if it does not already exist. -func (c *Controller) Requeue(obj interface{}) error { - return c.config.Queue.AddIfNotPresent(Deltas{ - Delta{ - Type: Sync, - Object: obj, - }, - }) +func (c *controller) LastSyncResourceVersion() string { + if c.reflector == nil { + return "" + } + return c.reflector.LastSyncResourceVersion() } // processLoop drains the work queue. @@ -126,7 +123,7 @@ func (c *Controller) Requeue(obj interface{}) error { // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. Converting this whole package to use Context would // also be helpful. -func (c *Controller) processLoop() { +func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { @@ -218,7 +215,7 @@ func NewInformer( objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, -) (Store, *Controller) { +) (Store, Controller) { // This will hold the client state, as we know it. clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) @@ -284,7 +281,7 @@ func NewIndexerInformer( resyncPeriod time.Duration, h ResourceEventHandler, indexers Indexers, -) (Indexer, *Controller) { +) (Indexer, Controller) { // This will hold the client state, as we know it. clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) diff --git a/pkg/client/cache/shared_informer.go b/pkg/client/cache/shared_informer.go index 7bdf7eaa434..840211effae 100644 --- a/pkg/client/cache/shared_informer.go +++ b/pkg/client/cache/shared_informer.go @@ -43,7 +43,7 @@ type SharedInformer interface { AddEventHandler(handler ResourceEventHandler) error GetStore() Store // GetController gives back a synthetic interface that "votes" to start the informer - GetController() ControllerInterface + GetController() Controller Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string @@ -108,7 +108,7 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool type sharedIndexInformer struct { indexer Indexer - controller *Controller + controller Controller processor *sharedProcessor cacheMutationDetector CacheMutationDetector @@ -145,6 +145,10 @@ func (v *dummyController) HasSynced() bool { return v.informer.HasSynced() } +func (c *dummyController) LastSyncResourceVersion() string { + return "" +} + type updateNotification struct { oldObj interface{} newObj interface{} @@ -207,10 +211,10 @@ func (s *sharedIndexInformer) LastSyncResourceVersion() string { s.startedLock.Lock() defer s.startedLock.Unlock() - if s.controller == nil || s.controller.reflector == nil { + if s.controller == nil { return "" } - return s.controller.reflector.LastSyncResourceVersion() + return s.controller.LastSyncResourceVersion() } func (s *sharedIndexInformer) GetStore() Store { @@ -232,7 +236,7 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { return s.indexer.AddIndexers(indexers) } -func (s *sharedIndexInformer) GetController() ControllerInterface { +func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index cfb3b1a63c0..8d7d4141726 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -48,7 +48,7 @@ type CertificateController struct { kubeClient clientset.Interface // CSR framework and store - csrController *cache.Controller + csrController cache.Controller csrStore cache.StoreToCertificateRequestLister syncHandler func(csrKey string) error diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index cca99f3af75..21d34969c28 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -63,26 +63,26 @@ type DisruptionController struct { kubeClient clientset.Interface pdbStore cache.Store - pdbController *cache.Controller + pdbController cache.Controller pdbLister cache.StoreToPodDisruptionBudgetLister - podController cache.ControllerInterface + podController cache.Controller podLister cache.StoreToPodLister rcIndexer cache.Indexer - rcController *cache.Controller + rcController cache.Controller rcLister cache.StoreToReplicationControllerLister rsStore cache.Store - rsController *cache.Controller + rsController cache.Controller rsLister cache.StoreToReplicaSetLister dIndexer cache.Indexer - dController *cache.Controller + dController cache.Controller dLister cache.StoreToDeploymentLister ssStore cache.Store - ssController *cache.Controller + ssController cache.Controller ssLister cache.StoreToStatefulSetLister // PodDisruptionBudget keys that need to be synced. diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index ce93a955213..50e5f0c8fac 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -147,8 +147,8 @@ type EndpointController struct { // Since we join two objects, we'll watch both of them with // controllers. - serviceController *cache.Controller - podController cache.ControllerInterface + serviceController cache.Controller + podController cache.Controller // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced func() bool diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 16571f6849d..effc27a9afc 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -48,7 +48,7 @@ const ResourceResyncTime time.Duration = 0 type monitor struct { store cache.Store - controller *cache.Controller + controller cache.Controller } type objectReference struct { diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 00a2a22c13f..e5878122a80 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -56,7 +56,7 @@ type NamespaceController struct { // store that holds the namespaces store cache.Store // controller that observes the namespaces - controller *cache.Controller + controller cache.Controller // namespaces that have been queued up for processing by workers queue workqueue.RateLimitingInterface // function to list of preferred resources for namespace deletion diff --git a/pkg/controller/petset/pet_set.go b/pkg/controller/petset/pet_set.go index abe4f5fd2dd..36b98dc913c 100644 --- a/pkg/controller/petset/pet_set.go +++ b/pkg/controller/petset/pet_set.go @@ -64,12 +64,12 @@ type StatefulSetController struct { // podStoreSynced returns true if the pod store has synced at least once. podStoreSynced func() bool // Watches changes to all pods. - podController cache.ControllerInterface + podController cache.Controller // A store of StatefulSets, populated by the psController. psStore cache.StoreToStatefulSetLister // Watches changes to all StatefulSets. - psController *cache.Controller + psController cache.Controller // A store of the 1 unhealthy pet blocking progress for a given ps blockingPetStore *unhealthyPetTracker diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 4d04f7a20fb..3a1a93fcdb3 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -66,13 +66,13 @@ type HorizontalController struct { // A store of HPA objects, populated by the controller. store cache.Store // Watches changes to all HPA objects. - controller *cache.Controller + controller cache.Controller } var downscaleForbiddenWindow = 5 * time.Minute var upscaleForbiddenWindow = 3 * time.Minute -func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, *cache.Controller) { +func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, cache.Controller) { return cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index 83a85805bdd..b93b9c125df 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -50,7 +50,7 @@ type PodGCController struct { internalPodInformer cache.SharedIndexInformer podStore cache.StoreToPodLister - podController cache.ControllerInterface + podController cache.Controller deletePod func(namespace, name string) error terminatedPodThreshold int diff --git a/pkg/controller/podgc/gc_controller_test.go b/pkg/controller/podgc/gc_controller_test.go index d123bbc5997..f0a075b7279 100644 --- a/pkg/controller/podgc/gc_controller_test.go +++ b/pkg/controller/podgc/gc_controller_test.go @@ -37,6 +37,10 @@ func (*FakeController) HasSynced() bool { return true } +func (*FakeController) LastSyncResourceVersion() string { + return "" +} + func TestGCTerminated(t *testing.T) { type nameToPhase struct { name string diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index ec1197c8e97..e283bd4be6d 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -86,7 +86,7 @@ type ReplicationManager struct { // A store of pods, populated by the podController podLister cache.StoreToPodLister // Watches changes to all pods - podController cache.ControllerInterface + podController cache.Controller // podListerSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podListerSynced func() bool diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go index a9fc5f9a587..fc34a73b261 100644 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ b/pkg/controller/resourcequota/replenishment_controller.go @@ -90,7 +90,7 @@ func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func type ReplenishmentControllerFactory interface { // NewController returns a controller configured with the specified options. // This method is NOT thread-safe. - NewController(options *ReplenishmentControllerOptions) (cache.ControllerInterface, error) + NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) } // replenishmentControllerFactory implements ReplenishmentControllerFactory @@ -118,7 +118,8 @@ func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) func controllerFor( groupResource schema.GroupResource, f informers.SharedInformerFactory, - handlerFuncs cache.ResourceEventHandlerFuncs) (cache.ControllerInterface, error) { + handlerFuncs cache.ResourceEventHandlerFuncs, +) (cache.Controller, error) { genericInformer, err := f.ForResource(groupResource) if err != nil { return nil, err @@ -128,7 +129,7 @@ func controllerFor( return informer.GetController(), nil } -func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) { +func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.Controller, err error) { if r.kubeClient != nil && r.kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().RESTClient().GetRateLimiter()) } @@ -276,7 +277,7 @@ func IsUnhandledGroupKindError(err error) bool { // returning the first success or failure it hits. If there are no hits either way, it return an UnhandledGroupKind error type UnionReplenishmentControllerFactory []ReplenishmentControllerFactory -func (f UnionReplenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.ControllerInterface, error) { +func (f UnionReplenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) { for _, factory := range f { controller, err := factory.NewController(options) if !IsUnhandledGroupKindError(err) { diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index a224e77f238..0979065a79f 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -60,7 +60,7 @@ type ResourceQuotaController struct { // An index of resource quota objects by namespace rqIndexer cache.Indexer // Watches changes to all resource quota - rqController *cache.Controller + rqController cache.Controller // ResourceQuota objects that need to be synchronized queue workqueue.RateLimitingInterface // missingUsageQueue holds objects that are missing the initial usage informatino @@ -72,7 +72,7 @@ type ResourceQuotaController struct { // knows how to calculate usage registry quota.Registry // controllers monitoring to notify for replenishment - replenishmentControllers []cache.ControllerInterface + replenishmentControllers []cache.Controller } func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController { @@ -83,7 +83,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"), resyncPeriod: options.ResyncPeriod, registry: options.Registry, - replenishmentControllers: []cache.ControllerInterface{}, + replenishmentControllers: []cache.Controller{}, } if options.KubeClient != nil && options.KubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", options.KubeClient.Core().RESTClient().GetRateLimiter()) diff --git a/pkg/controller/route/routecontroller.go b/pkg/controller/route/routecontroller.go index 543921c188e..d11291c2faa 100644 --- a/pkg/controller/route/routecontroller.go +++ b/pkg/controller/route/routecontroller.go @@ -54,7 +54,7 @@ type RouteController struct { clusterName string clusterCIDR *net.IPNet // Node framework and store - nodeController *cache.Controller + nodeController cache.Controller nodeStore cache.StoreToNodeLister } diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 4c72cf0f430..0d4e5d655ea 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -87,7 +87,7 @@ type ServiceController struct { // A store of services, populated by the serviceController serviceStore cache.StoreToServiceLister // Watches changes to all services - serviceController *cache.Controller + serviceController cache.Controller eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder nodeLister cache.StoreToNodeLister diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index 0cf48e8b8ce..d69ba10276f 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -145,8 +145,8 @@ type TokensController struct { secrets cache.Indexer // Since we join two objects, we'll watch both of them with controllers. - serviceAccountController *cache.Controller - secretController *cache.Controller + serviceAccountController cache.Controller + secretController cache.Controller // syncServiceAccountQueue handles service account events: // * ensures a referenced token exists for service accounts which still exist diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index b12de470f64..908f5d9a80e 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -146,10 +146,10 @@ const createProvisionedPVInterval = 10 * time.Second // cache.Controllers that watch PersistentVolume and PersistentVolumeClaim // changes. type PersistentVolumeController struct { - volumeController *cache.Controller + volumeController cache.Controller volumeInformer cache.Indexer volumeSource cache.ListerWatcher - claimController *cache.Controller + claimController cache.Controller claimInformer cache.Store claimSource cache.ListerWatcher classReflector *cache.Reflector diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 53b851c63e3..87d0f2ec6b0 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -80,12 +80,12 @@ type ConfigFactory struct { StopEverything chan struct{} informerFactory informers.SharedInformerFactory - scheduledPodPopulator cache.ControllerInterface - nodePopulator cache.ControllerInterface - pvPopulator cache.ControllerInterface - pvcPopulator cache.ControllerInterface - servicePopulator cache.ControllerInterface - controllerPopulator cache.ControllerInterface + scheduledPodPopulator cache.Controller + nodePopulator cache.Controller + pvPopulator cache.Controller + pvcPopulator cache.Controller + servicePopulator cache.Controller + controllerPopulator cache.Controller schedulerCache schedulercache.Cache diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index ff4ec0122ab..ab0cdb58a62 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -61,21 +61,20 @@ type Config struct { type ProcessFunc func(obj interface{}) error // Controller is a generic controller framework. -type Controller struct { +type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex } -// TODO make the "Controller" private, and convert all references to use ControllerInterface instead -type ControllerInterface interface { +type Controller interface { Run(stopCh <-chan struct{}) HasSynced() bool } // New makes a new Controller from the given Config. -func New(c *Config) *Controller { - ctlr := &Controller{ +func New(c *Config) Controller { + ctlr := &controller{ config: *c, } return ctlr @@ -84,7 +83,7 @@ func New(c *Config) *Controller { // Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. -func (c *Controller) Run(stopCh <-chan struct{}) { +func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() r := NewReflector( c.config.ListerWatcher, @@ -103,18 +102,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } // Returns true once this controller has completed an initial resource listing -func (c *Controller) HasSynced() bool { +func (c *controller) HasSynced() bool { return c.config.Queue.HasSynced() } -// Requeue adds the provided object back into the queue if it does not already exist. -func (c *Controller) Requeue(obj interface{}) error { - return c.config.Queue.AddIfNotPresent(Deltas{ - Delta{ - Type: Sync, - Object: obj, - }, - }) +func (c *controller) LastSyncResourceVersion() string { + if c.reflector == nil { + return "" + } + return c.reflector.LastSyncResourceVersion() } // processLoop drains the work queue. @@ -126,7 +122,7 @@ func (c *Controller) Requeue(obj interface{}) error { // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. Converting this whole package to use Context would // also be helpful. -func (c *Controller) processLoop() { +func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { @@ -218,7 +214,7 @@ func NewInformer( objType runtime.Object, resyncPeriod time.Duration, h ResourceEventHandler, -) (Store, *Controller) { +) (Store, Controller) { // This will hold the client state, as we know it. clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) @@ -284,7 +280,7 @@ func NewIndexerInformer( resyncPeriod time.Duration, h ResourceEventHandler, indexers Indexers, -) (Indexer, *Controller) { +) (Indexer, Controller) { // This will hold the client state, as we know it. clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 7687bca4768..65f13bc0667 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -43,7 +43,7 @@ type SharedInformer interface { AddEventHandler(handler ResourceEventHandler) error GetStore() Store // GetController gives back a synthetic interface that "votes" to start the informer - GetController() ControllerInterface + GetController() Controller Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string @@ -108,7 +108,7 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool type sharedIndexInformer struct { indexer Indexer - controller *Controller + controller Controller processor *sharedProcessor cacheMutationDetector CacheMutationDetector @@ -145,6 +145,10 @@ func (v *dummyController) HasSynced() bool { return v.informer.HasSynced() } +func (c *dummyController) LastSyncResourceVersion() string { + return "" +} + type updateNotification struct { oldObj interface{} newObj interface{} @@ -232,7 +236,7 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { return s.indexer.AddIndexers(indexers) } -func (s *sharedIndexInformer) GetController() ControllerInterface { +func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } diff --git a/test/e2e/daemon_restart.go b/test/e2e/daemon_restart.go index 76c37f120f4..d1f5284cc47 100644 --- a/test/e2e/daemon_restart.go +++ b/test/e2e/daemon_restart.go @@ -192,7 +192,7 @@ var _ = framework.KubeDescribe("DaemonRestart [Disruptive]", func() { existingPods := cache.NewStore(cache.MetaNamespaceKeyFunc) var ns string var config testutils.RCConfig - var controller *cache.Controller + var controller cache.Controller var newPods cache.Store var stopCh chan struct{} var tracker *podTracker diff --git a/test/e2e/network_partition.go b/test/e2e/network_partition.go index 06599922f60..d2666761476 100644 --- a/test/e2e/network_partition.go +++ b/test/e2e/network_partition.go @@ -188,7 +188,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() { nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name) stopCh := make(chan struct{}) newNode := make(chan *v1.Node) - var controller *cache.Controller + var controller cache.Controller _, controller = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { diff --git a/test/e2e_node/density_test.go b/test/e2e_node/density_test.go index 2936d65a3c1..4ce9f07e024 100644 --- a/test/e2e_node/density_test.go +++ b/test/e2e_node/density_test.go @@ -476,8 +476,7 @@ func verifyPodStartupLatency(expect, actual framework.LatencyMetric) error { } // newInformerWatchPod creates an informer to check whether all pods are running. -func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]metav1.Time, - podType string) *cache.Controller { +func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]metav1.Time, podType string) cache.Controller { ns := f.Namespace.Name checkPodRunning := func(p *v1.Pod) { mutex.Lock()