Merge pull request #37505 from k82cn/use_controller_inf

Automatic merge from submit-queue (batch tested with PRs 39807, 37505, 39844, 39525, 39109)

Made cache.Controller to be interface.

**What this PR does / why we need it**:

#37504
This commit is contained in:
Kubernetes Submit Queue 2017-01-13 13:40:41 -08:00 committed by GitHub
commit 6b5d82b512
36 changed files with 115 additions and 110 deletions

View File

@ -49,7 +49,7 @@ type ClusterController struct {
clusterKubeClientMap map[string]ClusterClient clusterKubeClientMap map[string]ClusterClient
// cluster framework and store // cluster framework and store
clusterController *cache.Controller clusterController cache.Controller
clusterStore clustercache.StoreToClusterLister clusterStore clustercache.StoreToClusterLister
} }

View File

@ -58,7 +58,7 @@ type ConfigMapController struct {
// Definitions of configmaps that should be federated. // Definitions of configmaps that should be federated.
configmapInformerStore cache.Store configmapInformerStore cache.Store
// Informer controller for configmaps that should be federated. // Informer controller for configmaps that should be federated.
configmapInformerController cache.ControllerInterface configmapInformerController cache.Controller
// Client to federated api server. // Client to federated api server.
federatedApiClient federationclientset.Interface federatedApiClient federationclientset.Interface
@ -112,7 +112,7 @@ func NewConfigMapController(client federationclientset.Interface) *ConfigMapCont
// Federated informer on configmaps in members of federation. // Federated informer on configmaps in members of federation.
configmapcontroller.configmapFederatedInformer = util.NewFederatedInformer( configmapcontroller.configmapFederatedInformer = util.NewFederatedInformer(
client, client,
func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) { ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) {

View File

@ -63,7 +63,7 @@ type DaemonSetController struct {
// Definitions of daemonsets that should be federated. // Definitions of daemonsets that should be federated.
daemonsetInformerStore cache.Store daemonsetInformerStore cache.Store
// Informer controller for daemonsets that should be federated. // Informer controller for daemonsets that should be federated.
daemonsetInformerController cache.ControllerInterface daemonsetInformerController cache.Controller
// Client to federated api server. // Client to federated api server.
federatedApiClient federationclientset.Interface federatedApiClient federationclientset.Interface
@ -119,7 +119,7 @@ func NewDaemonSetController(client federationclientset.Interface) *DaemonSetCont
// Federated informer on daemonsets in members of federation. // Federated informer on daemonsets in members of federation.
daemonsetcontroller.daemonsetFederatedInformer = util.NewFederatedInformer( daemonsetcontroller.daemonsetFederatedInformer = util.NewFederatedInformer(
client, client,
func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) { ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) {

View File

@ -80,7 +80,7 @@ func parseFederationDeploymentPreference(fd *extensionsv1.Deployment) (*fed.Fede
type DeploymentController struct { type DeploymentController struct {
fedClient fedclientset.Interface fedClient fedclientset.Interface
deploymentController *cache.Controller deploymentController cache.Controller
deploymentStore cache.Store deploymentStore cache.Store
fedDeploymentInformer fedutil.FederatedInformer fedDeploymentInformer fedutil.FederatedInformer
@ -119,7 +119,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
eventRecorder: recorder, eventRecorder: recorder,
} }
deploymentFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { deploymentFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) {
@ -146,7 +146,7 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
} }
fdc.fedDeploymentInformer = fedutil.NewFederatedInformer(federationClient, deploymentFedInformerFactory, &clusterLifecycle) fdc.fedDeploymentInformer = fedutil.NewFederatedInformer(federationClient, deploymentFedInformerFactory, &clusterLifecycle)
podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) {

View File

@ -91,7 +91,7 @@ type IngressController struct {
// Definitions of ingresses that should be federated. // Definitions of ingresses that should be federated.
ingressInformerStore cache.Store ingressInformerStore cache.Store
// Informer controller for ingresses that should be federated. // Informer controller for ingresses that should be federated.
ingressInformerController cache.ControllerInterface ingressInformerController cache.Controller
// Client to federated api server. // Client to federated api server.
federatedApiClient federationclientset.Interface federatedApiClient federationclientset.Interface
@ -157,7 +157,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
// Federated informer on ingresses in members of federation. // Federated informer on ingresses in members of federation.
ic.ingressFederatedInformer = util.NewFederatedInformer( ic.ingressFederatedInformer = util.NewFederatedInformer(
client, client,
func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) {
@ -189,7 +189,7 @@ func NewIngressController(client federationclientset.Interface) *IngressControll
// Federated informer on configmaps for ingress controllers in members of the federation. // Federated informer on configmaps for ingress controllers in members of the federation.
ic.configMapFederatedInformer = util.NewFederatedInformer( ic.configMapFederatedInformer = util.NewFederatedInformer(
client, client,
func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
glog.V(4).Infof("Returning new informer for cluster %q", cluster.Name) glog.V(4).Infof("Returning new informer for cluster %q", cluster.Name)
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{

View File

@ -60,7 +60,7 @@ type NamespaceController struct {
// Definitions of namespaces that should be federated. // Definitions of namespaces that should be federated.
namespaceInformerStore cache.Store namespaceInformerStore cache.Store
// Informer controller for namespaces that should be federated. // Informer controller for namespaces that should be federated.
namespaceInformerController cache.ControllerInterface namespaceInformerController cache.Controller
// Client to federated api server. // Client to federated api server.
federatedApiClient federationclientset.Interface federatedApiClient federationclientset.Interface
@ -116,7 +116,7 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont
// Federated informer on namespaces in members of federation. // Federated informer on namespaces in members of federation.
nc.namespaceFederatedInformer = util.NewFederatedInformer( nc.namespaceFederatedInformer = util.NewFederatedInformer(
client, client,
func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) {

View File

@ -80,7 +80,7 @@ func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.Fede
type ReplicaSetController struct { type ReplicaSetController struct {
fedClient fedclientset.Interface fedClient fedclientset.Interface
replicaSetController *cache.Controller replicaSetController cache.Controller
replicaSetStore cache.StoreToReplicaSetLister replicaSetStore cache.StoreToReplicaSetLister
fedReplicaSetInformer fedutil.FederatedInformer fedReplicaSetInformer fedutil.FederatedInformer
@ -121,7 +121,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
eventRecorder: recorder, eventRecorder: recorder,
} }
replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) {
@ -148,7 +148,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
} }
frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle) frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle)
podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) {

View File

@ -61,7 +61,7 @@ type SecretController struct {
// Definitions of secrets that should be federated. // Definitions of secrets that should be federated.
secretInformerStore cache.Store secretInformerStore cache.Store
// Informer controller for secrets that should be federated. // Informer controller for secrets that should be federated.
secretInformerController cache.ControllerInterface secretInformerController cache.Controller
// Client to federated api server. // Client to federated api server.
federatedApiClient federationclientset.Interface federatedApiClient federationclientset.Interface
@ -117,7 +117,7 @@ func NewSecretController(client federationclientset.Interface) *SecretController
// Federated informer on secrets in members of federation. // Federated informer on secrets in members of federation.
secretcontroller.secretFederatedInformer = util.NewFederatedInformer( secretcontroller.secretFederatedInformer = util.NewFederatedInformer(
client, client,
func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { func(cluster *federationapi.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) { ListFunc: func(options apiv1.ListOptions) (pkgruntime.Object, error) {

View File

@ -41,11 +41,11 @@ type clusterCache struct {
// A store of services, populated by the serviceController // A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister serviceStore cache.StoreToServiceLister
// Watches changes to all services // Watches changes to all services
serviceController *cache.Controller serviceController cache.Controller
// A store of endpoint, populated by the serviceController // A store of endpoint, populated by the serviceController
endpointStore cache.StoreToEndpointsLister endpointStore cache.StoreToEndpointsLister
// Watches changes to all endpoints // Watches changes to all endpoints
endpointController *cache.Controller endpointController cache.Controller
// services that need to be synced // services that need to be synced
serviceQueue *workqueue.Type serviceQueue *workqueue.Type
// endpoints that need to be synced // endpoints that need to be synced

View File

@ -121,12 +121,12 @@ type ServiceController struct {
// A store of services, populated by the serviceController // A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister serviceStore cache.StoreToServiceLister
// Watches changes to all services // Watches changes to all services
serviceController *cache.Controller serviceController cache.Controller
federatedInformer fedutil.FederatedInformer federatedInformer fedutil.FederatedInformer
// A store of services, populated by the serviceController // A store of services, populated by the serviceController
clusterStore federationcache.StoreToClusterLister clusterStore federationcache.StoreToClusterLister
// Watches changes to all services // Watches changes to all services
clusterController *cache.Controller clusterController cache.Controller
eventBroadcaster record.EventBroadcaster eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
// services that need to be synced // services that need to be synced
@ -245,7 +245,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
s.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) s.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}, },
} }
fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) {

View File

@ -116,7 +116,7 @@ type FederatedInformerForTestOnly interface {
// A function that should be used to create an informer on the target object. Store should use // A function that should be used to create an informer on the target object. Store should use
// cache.DeletionHandlingMetaNamespaceKeyFunc as a keying function. // cache.DeletionHandlingMetaNamespaceKeyFunc as a keying function.
type TargetInformerFactory func(*federationapi.Cluster, kubeclientset.Interface) (cache.Store, cache.ControllerInterface) type TargetInformerFactory func(*federationapi.Cluster, kubeclientset.Interface) (cache.Store, cache.Controller)
// A structure with cluster lifecycle handler functions. Cluster is available (and ClusterAvailable is fired) // A structure with cluster lifecycle handler functions. Cluster is available (and ClusterAvailable is fired)
// when it is created in federated etcd and ready. Cluster becomes unavailable (and ClusterUnavailable is fired) // when it is created in federated etcd and ready. Cluster becomes unavailable (and ClusterUnavailable is fired)
@ -242,7 +242,7 @@ func isClusterReady(cluster *federationapi.Cluster) bool {
} }
type informer struct { type informer struct {
controller cache.ControllerInterface controller cache.Controller
store cache.Store store cache.Store
stopChan chan struct{} stopChan chan struct{}
} }

View File

@ -77,7 +77,7 @@ func TestFederatedInformer(t *testing.T) {
return true, watch.NewFake(), nil return true, watch.NewFake(), nil
}) })
targetInformerFactory := func(cluster *federationapi.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { targetInformerFactory := func(cluster *federationapi.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) { ListFunc: func(options apiv1.ListOptions) (runtime.Object, error) {

View File

@ -61,21 +61,21 @@ type Config struct {
type ProcessFunc func(obj interface{}) error type ProcessFunc func(obj interface{}) error
// Controller is a generic controller framework. // Controller is a generic controller framework.
type Controller struct { type controller struct {
config Config config Config
reflector *Reflector reflector *Reflector
reflectorMutex sync.RWMutex reflectorMutex sync.RWMutex
} }
// TODO make the "Controller" private, and convert all references to use ControllerInterface instead type Controller interface {
type ControllerInterface interface {
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
HasSynced() bool HasSynced() bool
LastSyncResourceVersion() string
} }
// New makes a new Controller from the given Config. // New makes a new Controller from the given Config.
func New(c *Config) *Controller { func New(c *Config) Controller {
ctlr := &Controller{ ctlr := &controller{
config: *c, config: *c,
} }
return ctlr return ctlr
@ -84,7 +84,7 @@ func New(c *Config) *Controller {
// Run begins processing items, and will continue until a value is sent down stopCh. // Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once. // It's an error to call Run more than once.
// Run blocks; call via go. // Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
r := NewReflector( r := NewReflector(
c.config.ListerWatcher, c.config.ListerWatcher,
@ -103,18 +103,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
} }
// Returns true once this controller has completed an initial resource listing // Returns true once this controller has completed an initial resource listing
func (c *Controller) HasSynced() bool { func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced() return c.config.Queue.HasSynced()
} }
// Requeue adds the provided object back into the queue if it does not already exist. func (c *controller) LastSyncResourceVersion() string {
func (c *Controller) Requeue(obj interface{}) error { if c.reflector == nil {
return c.config.Queue.AddIfNotPresent(Deltas{ return ""
Delta{ }
Type: Sync, return c.reflector.LastSyncResourceVersion()
Object: obj,
},
})
} }
// processLoop drains the work queue. // processLoop drains the work queue.
@ -126,7 +123,7 @@ func (c *Controller) Requeue(obj interface{}) error {
// actually exit when the controller is stopped. Or just give up on this stuff // actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would // ever being stoppable. Converting this whole package to use Context would
// also be helpful. // also be helpful.
func (c *Controller) processLoop() { func (c *controller) processLoop() {
for { for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil { if err != nil {
@ -218,7 +215,7 @@ func NewInformer(
objType runtime.Object, objType runtime.Object,
resyncPeriod time.Duration, resyncPeriod time.Duration,
h ResourceEventHandler, h ResourceEventHandler,
) (Store, *Controller) { ) (Store, Controller) {
// This will hold the client state, as we know it. // This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
@ -284,7 +281,7 @@ func NewIndexerInformer(
resyncPeriod time.Duration, resyncPeriod time.Duration,
h ResourceEventHandler, h ResourceEventHandler,
indexers Indexers, indexers Indexers,
) (Indexer, *Controller) { ) (Indexer, Controller) {
// This will hold the client state, as we know it. // This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

View File

@ -43,7 +43,7 @@ type SharedInformer interface {
AddEventHandler(handler ResourceEventHandler) error AddEventHandler(handler ResourceEventHandler) error
GetStore() Store GetStore() Store
// GetController gives back a synthetic interface that "votes" to start the informer // GetController gives back a synthetic interface that "votes" to start the informer
GetController() ControllerInterface GetController() Controller
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
HasSynced() bool HasSynced() bool
LastSyncResourceVersion() string LastSyncResourceVersion() string
@ -108,7 +108,7 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
type sharedIndexInformer struct { type sharedIndexInformer struct {
indexer Indexer indexer Indexer
controller *Controller controller Controller
processor *sharedProcessor processor *sharedProcessor
cacheMutationDetector CacheMutationDetector cacheMutationDetector CacheMutationDetector
@ -145,6 +145,10 @@ func (v *dummyController) HasSynced() bool {
return v.informer.HasSynced() return v.informer.HasSynced()
} }
func (c *dummyController) LastSyncResourceVersion() string {
return ""
}
type updateNotification struct { type updateNotification struct {
oldObj interface{} oldObj interface{}
newObj interface{} newObj interface{}
@ -207,10 +211,10 @@ func (s *sharedIndexInformer) LastSyncResourceVersion() string {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
if s.controller == nil || s.controller.reflector == nil { if s.controller == nil {
return "" return ""
} }
return s.controller.reflector.LastSyncResourceVersion() return s.controller.LastSyncResourceVersion()
} }
func (s *sharedIndexInformer) GetStore() Store { func (s *sharedIndexInformer) GetStore() Store {
@ -232,7 +236,7 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
return s.indexer.AddIndexers(indexers) return s.indexer.AddIndexers(indexers)
} }
func (s *sharedIndexInformer) GetController() ControllerInterface { func (s *sharedIndexInformer) GetController() Controller {
return &dummyController{informer: s} return &dummyController{informer: s}
} }

View File

@ -48,7 +48,7 @@ type CertificateController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
// CSR framework and store // CSR framework and store
csrController *cache.Controller csrController cache.Controller
csrStore cache.StoreToCertificateRequestLister csrStore cache.StoreToCertificateRequestLister
syncHandler func(csrKey string) error syncHandler func(csrKey string) error

View File

@ -63,26 +63,26 @@ type DisruptionController struct {
kubeClient clientset.Interface kubeClient clientset.Interface
pdbStore cache.Store pdbStore cache.Store
pdbController *cache.Controller pdbController cache.Controller
pdbLister cache.StoreToPodDisruptionBudgetLister pdbLister cache.StoreToPodDisruptionBudgetLister
podController cache.ControllerInterface podController cache.Controller
podLister cache.StoreToPodLister podLister cache.StoreToPodLister
rcIndexer cache.Indexer rcIndexer cache.Indexer
rcController *cache.Controller rcController cache.Controller
rcLister cache.StoreToReplicationControllerLister rcLister cache.StoreToReplicationControllerLister
rsStore cache.Store rsStore cache.Store
rsController *cache.Controller rsController cache.Controller
rsLister cache.StoreToReplicaSetLister rsLister cache.StoreToReplicaSetLister
dIndexer cache.Indexer dIndexer cache.Indexer
dController *cache.Controller dController cache.Controller
dLister cache.StoreToDeploymentLister dLister cache.StoreToDeploymentLister
ssStore cache.Store ssStore cache.Store
ssController *cache.Controller ssController cache.Controller
ssLister cache.StoreToStatefulSetLister ssLister cache.StoreToStatefulSetLister
// PodDisruptionBudget keys that need to be synced. // PodDisruptionBudget keys that need to be synced.

View File

@ -147,8 +147,8 @@ type EndpointController struct {
// Since we join two objects, we'll watch both of them with // Since we join two objects, we'll watch both of them with
// controllers. // controllers.
serviceController *cache.Controller serviceController cache.Controller
podController cache.ControllerInterface podController cache.Controller
// podStoreSynced returns true if the pod store has been synced at least once. // podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool podStoreSynced func() bool

View File

@ -48,7 +48,7 @@ const ResourceResyncTime time.Duration = 0
type monitor struct { type monitor struct {
store cache.Store store cache.Store
controller *cache.Controller controller cache.Controller
} }
type objectReference struct { type objectReference struct {

View File

@ -56,7 +56,7 @@ type NamespaceController struct {
// store that holds the namespaces // store that holds the namespaces
store cache.Store store cache.Store
// controller that observes the namespaces // controller that observes the namespaces
controller *cache.Controller controller cache.Controller
// namespaces that have been queued up for processing by workers // namespaces that have been queued up for processing by workers
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
// function to list of preferred resources for namespace deletion // function to list of preferred resources for namespace deletion

View File

@ -64,12 +64,12 @@ type StatefulSetController struct {
// podStoreSynced returns true if the pod store has synced at least once. // podStoreSynced returns true if the pod store has synced at least once.
podStoreSynced func() bool podStoreSynced func() bool
// Watches changes to all pods. // Watches changes to all pods.
podController cache.ControllerInterface podController cache.Controller
// A store of StatefulSets, populated by the psController. // A store of StatefulSets, populated by the psController.
psStore cache.StoreToStatefulSetLister psStore cache.StoreToStatefulSetLister
// Watches changes to all StatefulSets. // Watches changes to all StatefulSets.
psController *cache.Controller psController cache.Controller
// A store of the 1 unhealthy pet blocking progress for a given ps // A store of the 1 unhealthy pet blocking progress for a given ps
blockingPetStore *unhealthyPetTracker blockingPetStore *unhealthyPetTracker

View File

@ -66,13 +66,13 @@ type HorizontalController struct {
// A store of HPA objects, populated by the controller. // A store of HPA objects, populated by the controller.
store cache.Store store cache.Store
// Watches changes to all HPA objects. // Watches changes to all HPA objects.
controller *cache.Controller controller cache.Controller
} }
var downscaleForbiddenWindow = 5 * time.Minute var downscaleForbiddenWindow = 5 * time.Minute
var upscaleForbiddenWindow = 3 * time.Minute var upscaleForbiddenWindow = 3 * time.Minute
func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, *cache.Controller) { func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, cache.Controller) {
return cache.NewInformer( return cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) { ListFunc: func(options v1.ListOptions) (runtime.Object, error) {

View File

@ -50,7 +50,7 @@ type PodGCController struct {
internalPodInformer cache.SharedIndexInformer internalPodInformer cache.SharedIndexInformer
podStore cache.StoreToPodLister podStore cache.StoreToPodLister
podController cache.ControllerInterface podController cache.Controller
deletePod func(namespace, name string) error deletePod func(namespace, name string) error
terminatedPodThreshold int terminatedPodThreshold int

View File

@ -37,6 +37,10 @@ func (*FakeController) HasSynced() bool {
return true return true
} }
func (*FakeController) LastSyncResourceVersion() string {
return ""
}
func TestGCTerminated(t *testing.T) { func TestGCTerminated(t *testing.T) {
type nameToPhase struct { type nameToPhase struct {
name string name string

View File

@ -86,7 +86,7 @@ type ReplicationManager struct {
// A store of pods, populated by the podController // A store of pods, populated by the podController
podLister cache.StoreToPodLister podLister cache.StoreToPodLister
// Watches changes to all pods // Watches changes to all pods
podController cache.ControllerInterface podController cache.Controller
// podListerSynced returns true if the pod store has been synced at least once. // podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podListerSynced func() bool podListerSynced func() bool

View File

@ -90,7 +90,7 @@ func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func
type ReplenishmentControllerFactory interface { type ReplenishmentControllerFactory interface {
// NewController returns a controller configured with the specified options. // NewController returns a controller configured with the specified options.
// This method is NOT thread-safe. // This method is NOT thread-safe.
NewController(options *ReplenishmentControllerOptions) (cache.ControllerInterface, error) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error)
} }
// replenishmentControllerFactory implements ReplenishmentControllerFactory // replenishmentControllerFactory implements ReplenishmentControllerFactory
@ -118,7 +118,8 @@ func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface)
func controllerFor( func controllerFor(
groupResource schema.GroupResource, groupResource schema.GroupResource,
f informers.SharedInformerFactory, f informers.SharedInformerFactory,
handlerFuncs cache.ResourceEventHandlerFuncs) (cache.ControllerInterface, error) { handlerFuncs cache.ResourceEventHandlerFuncs,
) (cache.Controller, error) {
genericInformer, err := f.ForResource(groupResource) genericInformer, err := f.ForResource(groupResource)
if err != nil { if err != nil {
return nil, err return nil, err
@ -128,7 +129,7 @@ func controllerFor(
return informer.GetController(), nil return informer.GetController(), nil
} }
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) { func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.Controller, err error) {
if r.kubeClient != nil && r.kubeClient.Core().RESTClient().GetRateLimiter() != nil { if r.kubeClient != nil && r.kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().RESTClient().GetRateLimiter()) metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().RESTClient().GetRateLimiter())
} }
@ -276,7 +277,7 @@ func IsUnhandledGroupKindError(err error) bool {
// returning the first success or failure it hits. If there are no hits either way, it return an UnhandledGroupKind error // returning the first success or failure it hits. If there are no hits either way, it return an UnhandledGroupKind error
type UnionReplenishmentControllerFactory []ReplenishmentControllerFactory type UnionReplenishmentControllerFactory []ReplenishmentControllerFactory
func (f UnionReplenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.ControllerInterface, error) { func (f UnionReplenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (cache.Controller, error) {
for _, factory := range f { for _, factory := range f {
controller, err := factory.NewController(options) controller, err := factory.NewController(options)
if !IsUnhandledGroupKindError(err) { if !IsUnhandledGroupKindError(err) {

View File

@ -60,7 +60,7 @@ type ResourceQuotaController struct {
// An index of resource quota objects by namespace // An index of resource quota objects by namespace
rqIndexer cache.Indexer rqIndexer cache.Indexer
// Watches changes to all resource quota // Watches changes to all resource quota
rqController *cache.Controller rqController cache.Controller
// ResourceQuota objects that need to be synchronized // ResourceQuota objects that need to be synchronized
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
// missingUsageQueue holds objects that are missing the initial usage informatino // missingUsageQueue holds objects that are missing the initial usage informatino
@ -72,7 +72,7 @@ type ResourceQuotaController struct {
// knows how to calculate usage // knows how to calculate usage
registry quota.Registry registry quota.Registry
// controllers monitoring to notify for replenishment // controllers monitoring to notify for replenishment
replenishmentControllers []cache.ControllerInterface replenishmentControllers []cache.Controller
} }
func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController { func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
@ -83,7 +83,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"), missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
resyncPeriod: options.ResyncPeriod, resyncPeriod: options.ResyncPeriod,
registry: options.Registry, registry: options.Registry,
replenishmentControllers: []cache.ControllerInterface{}, replenishmentControllers: []cache.Controller{},
} }
if options.KubeClient != nil && options.KubeClient.Core().RESTClient().GetRateLimiter() != nil { if options.KubeClient != nil && options.KubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", options.KubeClient.Core().RESTClient().GetRateLimiter()) metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", options.KubeClient.Core().RESTClient().GetRateLimiter())

View File

@ -54,7 +54,7 @@ type RouteController struct {
clusterName string clusterName string
clusterCIDR *net.IPNet clusterCIDR *net.IPNet
// Node framework and store // Node framework and store
nodeController *cache.Controller nodeController cache.Controller
nodeStore cache.StoreToNodeLister nodeStore cache.StoreToNodeLister
} }

View File

@ -87,7 +87,7 @@ type ServiceController struct {
// A store of services, populated by the serviceController // A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister serviceStore cache.StoreToServiceLister
// Watches changes to all services // Watches changes to all services
serviceController *cache.Controller serviceController cache.Controller
eventBroadcaster record.EventBroadcaster eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
nodeLister cache.StoreToNodeLister nodeLister cache.StoreToNodeLister

View File

@ -145,8 +145,8 @@ type TokensController struct {
secrets cache.Indexer secrets cache.Indexer
// Since we join two objects, we'll watch both of them with controllers. // Since we join two objects, we'll watch both of them with controllers.
serviceAccountController *cache.Controller serviceAccountController cache.Controller
secretController *cache.Controller secretController cache.Controller
// syncServiceAccountQueue handles service account events: // syncServiceAccountQueue handles service account events:
// * ensures a referenced token exists for service accounts which still exist // * ensures a referenced token exists for service accounts which still exist

View File

@ -146,10 +146,10 @@ const createProvisionedPVInterval = 10 * time.Second
// cache.Controllers that watch PersistentVolume and PersistentVolumeClaim // cache.Controllers that watch PersistentVolume and PersistentVolumeClaim
// changes. // changes.
type PersistentVolumeController struct { type PersistentVolumeController struct {
volumeController *cache.Controller volumeController cache.Controller
volumeInformer cache.Indexer volumeInformer cache.Indexer
volumeSource cache.ListerWatcher volumeSource cache.ListerWatcher
claimController *cache.Controller claimController cache.Controller
claimInformer cache.Store claimInformer cache.Store
claimSource cache.ListerWatcher claimSource cache.ListerWatcher
classReflector *cache.Reflector classReflector *cache.Reflector

View File

@ -80,12 +80,12 @@ type ConfigFactory struct {
StopEverything chan struct{} StopEverything chan struct{}
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
scheduledPodPopulator cache.ControllerInterface scheduledPodPopulator cache.Controller
nodePopulator cache.ControllerInterface nodePopulator cache.Controller
pvPopulator cache.ControllerInterface pvPopulator cache.Controller
pvcPopulator cache.ControllerInterface pvcPopulator cache.Controller
servicePopulator cache.ControllerInterface servicePopulator cache.Controller
controllerPopulator cache.ControllerInterface controllerPopulator cache.Controller
schedulerCache schedulercache.Cache schedulerCache schedulercache.Cache

View File

@ -61,21 +61,20 @@ type Config struct {
type ProcessFunc func(obj interface{}) error type ProcessFunc func(obj interface{}) error
// Controller is a generic controller framework. // Controller is a generic controller framework.
type Controller struct { type controller struct {
config Config config Config
reflector *Reflector reflector *Reflector
reflectorMutex sync.RWMutex reflectorMutex sync.RWMutex
} }
// TODO make the "Controller" private, and convert all references to use ControllerInterface instead type Controller interface {
type ControllerInterface interface {
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
HasSynced() bool HasSynced() bool
} }
// New makes a new Controller from the given Config. // New makes a new Controller from the given Config.
func New(c *Config) *Controller { func New(c *Config) Controller {
ctlr := &Controller{ ctlr := &controller{
config: *c, config: *c,
} }
return ctlr return ctlr
@ -84,7 +83,7 @@ func New(c *Config) *Controller {
// Run begins processing items, and will continue until a value is sent down stopCh. // Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once. // It's an error to call Run more than once.
// Run blocks; call via go. // Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
r := NewReflector( r := NewReflector(
c.config.ListerWatcher, c.config.ListerWatcher,
@ -103,18 +102,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
} }
// Returns true once this controller has completed an initial resource listing // Returns true once this controller has completed an initial resource listing
func (c *Controller) HasSynced() bool { func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced() return c.config.Queue.HasSynced()
} }
// Requeue adds the provided object back into the queue if it does not already exist. func (c *controller) LastSyncResourceVersion() string {
func (c *Controller) Requeue(obj interface{}) error { if c.reflector == nil {
return c.config.Queue.AddIfNotPresent(Deltas{ return ""
Delta{ }
Type: Sync, return c.reflector.LastSyncResourceVersion()
Object: obj,
},
})
} }
// processLoop drains the work queue. // processLoop drains the work queue.
@ -126,7 +122,7 @@ func (c *Controller) Requeue(obj interface{}) error {
// actually exit when the controller is stopped. Or just give up on this stuff // actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would // ever being stoppable. Converting this whole package to use Context would
// also be helpful. // also be helpful.
func (c *Controller) processLoop() { func (c *controller) processLoop() {
for { for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil { if err != nil {
@ -218,7 +214,7 @@ func NewInformer(
objType runtime.Object, objType runtime.Object,
resyncPeriod time.Duration, resyncPeriod time.Duration,
h ResourceEventHandler, h ResourceEventHandler,
) (Store, *Controller) { ) (Store, Controller) {
// This will hold the client state, as we know it. // This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
@ -284,7 +280,7 @@ func NewIndexerInformer(
resyncPeriod time.Duration, resyncPeriod time.Duration,
h ResourceEventHandler, h ResourceEventHandler,
indexers Indexers, indexers Indexers,
) (Indexer, *Controller) { ) (Indexer, Controller) {
// This will hold the client state, as we know it. // This will hold the client state, as we know it.
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

View File

@ -43,7 +43,7 @@ type SharedInformer interface {
AddEventHandler(handler ResourceEventHandler) error AddEventHandler(handler ResourceEventHandler) error
GetStore() Store GetStore() Store
// GetController gives back a synthetic interface that "votes" to start the informer // GetController gives back a synthetic interface that "votes" to start the informer
GetController() ControllerInterface GetController() Controller
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
HasSynced() bool HasSynced() bool
LastSyncResourceVersion() string LastSyncResourceVersion() string
@ -108,7 +108,7 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
type sharedIndexInformer struct { type sharedIndexInformer struct {
indexer Indexer indexer Indexer
controller *Controller controller Controller
processor *sharedProcessor processor *sharedProcessor
cacheMutationDetector CacheMutationDetector cacheMutationDetector CacheMutationDetector
@ -145,6 +145,10 @@ func (v *dummyController) HasSynced() bool {
return v.informer.HasSynced() return v.informer.HasSynced()
} }
func (c *dummyController) LastSyncResourceVersion() string {
return ""
}
type updateNotification struct { type updateNotification struct {
oldObj interface{} oldObj interface{}
newObj interface{} newObj interface{}
@ -232,7 +236,7 @@ func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
return s.indexer.AddIndexers(indexers) return s.indexer.AddIndexers(indexers)
} }
func (s *sharedIndexInformer) GetController() ControllerInterface { func (s *sharedIndexInformer) GetController() Controller {
return &dummyController{informer: s} return &dummyController{informer: s}
} }

View File

@ -192,7 +192,7 @@ var _ = framework.KubeDescribe("DaemonRestart [Disruptive]", func() {
existingPods := cache.NewStore(cache.MetaNamespaceKeyFunc) existingPods := cache.NewStore(cache.MetaNamespaceKeyFunc)
var ns string var ns string
var config testutils.RCConfig var config testutils.RCConfig
var controller *cache.Controller var controller cache.Controller
var newPods cache.Store var newPods cache.Store
var stopCh chan struct{} var stopCh chan struct{}
var tracker *podTracker var tracker *podTracker

View File

@ -188,7 +188,7 @@ var _ = framework.KubeDescribe("Network Partition [Disruptive] [Slow]", func() {
nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name) nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
newNode := make(chan *v1.Node) newNode := make(chan *v1.Node)
var controller *cache.Controller var controller cache.Controller
_, controller = cache.NewInformer( _, controller = cache.NewInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) { ListFunc: func(options v1.ListOptions) (runtime.Object, error) {

View File

@ -476,8 +476,7 @@ func verifyPodStartupLatency(expect, actual framework.LatencyMetric) error {
} }
// newInformerWatchPod creates an informer to check whether all pods are running. // newInformerWatchPod creates an informer to check whether all pods are running.
func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]metav1.Time, func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]metav1.Time, podType string) cache.Controller {
podType string) *cache.Controller {
ns := f.Namespace.Name ns := f.Namespace.Name
checkPodRunning := func(p *v1.Pod) { checkPodRunning := func(p *v1.Pod) {
mutex.Lock() mutex.Lock()