From 7e4c3096fe71afc6a23c273b3309ed5db7289d8c Mon Sep 17 00:00:00 2001 From: Yassine TIJANI Date: Fri, 16 Aug 2019 21:15:53 +0200 Subject: [PATCH] move WaitForCacheSync to the sharedInformer package Signed-off-by: Yassine TIJANI --- pkg/controller/BUILD | 1 - pkg/controller/bootstrap/bootstrapsigner.go | 3 +-- pkg/controller/bootstrap/tokencleaner.go | 2 +- .../certificates/certificate_controller.go | 2 +- .../certificates/rootcacertpublisher/BUILD | 1 - .../rootcacertpublisher/publisher.go | 5 ++--- .../clusterroleaggregation_controller.go | 2 +- pkg/controller/controller_utils.go | 16 ---------------- pkg/controller/daemon/daemon_controller.go | 2 +- .../deployment/deployment_controller.go | 2 +- pkg/controller/disruption/disruption.go | 2 +- pkg/controller/endpoint/endpoints_controller.go | 2 +- .../endpoint/endpoints_controller_test.go | 2 +- .../garbagecollector/garbagecollector.go | 5 +++-- .../garbagecollector/garbagecollector_test.go | 2 +- pkg/controller/job/job_controller.go | 2 +- pkg/controller/namespace/namespace_controller.go | 2 +- pkg/controller/nodeipam/BUILD | 1 - pkg/controller/nodeipam/ipam/BUILD | 1 - .../nodeipam/ipam/cloud_cidr_allocator.go | 3 +-- pkg/controller/nodeipam/ipam/range_allocator.go | 3 +-- pkg/controller/nodeipam/node_ipam_controller.go | 3 +-- .../nodelifecycle/node_lifecycle_controller.go | 2 +- pkg/controller/podautoscaler/BUILD | 1 + pkg/controller/podautoscaler/horizontal.go | 2 +- .../legacy_replica_calculator_test.go | 3 ++- .../podautoscaler/replica_calculator_test.go | 3 ++- pkg/controller/podgc/BUILD | 1 - pkg/controller/podgc/gc_controller.go | 3 +-- pkg/controller/replicaset/replica_set.go | 2 +- .../resourcequota/resource_quota_controller.go | 4 ++-- .../resource_quota_controller_test.go | 2 +- pkg/controller/route/BUILD | 1 - pkg/controller/route/route_controller.go | 3 +-- pkg/controller/service/service_controller.go | 2 +- pkg/controller/serviceaccount/BUILD | 1 - .../serviceaccount/serviceaccounts_controller.go | 3 +-- .../serviceaccount/tokens_controller.go | 3 +-- pkg/controller/statefulset/stateful_set.go | 2 +- pkg/controller/ttl/ttl_controller.go | 2 +- .../ttlafterfinished_controller.go | 2 +- pkg/controller/volume/attachdetach/BUILD | 2 +- .../attachdetach/attach_detach_controller.go | 3 +-- .../attach_detach_controller_test.go | 5 +++-- pkg/controller/volume/expand/BUILD | 1 - .../volume/expand/expand_controller.go | 3 +-- .../persistentvolume/pv_controller_base.go | 2 +- pkg/controller/volume/pvcprotection/BUILD | 1 - .../pvcprotection/pvc_protection_controller.go | 3 +-- pkg/controller/volume/pvprotection/BUILD | 1 - .../pvprotection/pv_protection_controller.go | 3 +-- pkg/kubemark/BUILD | 1 - pkg/kubemark/controller.go | 3 +-- pkg/master/controller/crdregistration/BUILD | 1 - .../crdregistration_controller.go | 3 +-- pkg/proxy/config/BUILD | 1 - pkg/proxy/config/config.go | 5 ++--- .../k8s.io/client-go/examples/workqueue/main.go | 2 +- .../client-go/tools/cache/shared_informer.go | 16 ++++++++++++++++ test/integration/scheduler/BUILD | 2 +- test/integration/scheduler/util.go | 4 ++-- 61 files changed, 72 insertions(+), 96 deletions(-) diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index c4a87570d6c..4a70c1ff19d 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -74,7 +74,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/bootstrap/bootstrapsigner.go b/pkg/controller/bootstrap/bootstrapsigner.go index 713022bf641..2ea987f76fb 100644 --- a/pkg/controller/bootstrap/bootstrapsigner.go +++ b/pkg/controller/bootstrap/bootstrapsigner.go @@ -37,7 +37,6 @@ import ( bootstrapapi "k8s.io/cluster-bootstrap/token/api" jws "k8s.io/cluster-bootstrap/token/jws" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" ) @@ -159,7 +158,7 @@ func (e *Signer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer e.syncQueue.ShutDown() - if !controller.WaitForCacheSync("bootstrap_signer", stopCh, e.configMapSynced, e.secretSynced) { + if !cache.WaitForNamedCacheSync("bootstrap_signer", stopCh, e.configMapSynced, e.secretSynced) { return } diff --git a/pkg/controller/bootstrap/tokencleaner.go b/pkg/controller/bootstrap/tokencleaner.go index 3249c67ca8f..d41552d70ce 100644 --- a/pkg/controller/bootstrap/tokencleaner.go +++ b/pkg/controller/bootstrap/tokencleaner.go @@ -117,7 +117,7 @@ func (tc *TokenCleaner) Run(stopCh <-chan struct{}) { klog.Infof("Starting token cleaner controller") defer klog.Infof("Shutting down token cleaner controller") - if !controller.WaitForCacheSync("token_cleaner", stopCh, tc.secretSynced) { + if !cache.WaitForNamedCacheSync("token_cleaner", stopCh, tc.secretSynced) { return } diff --git a/pkg/controller/certificates/certificate_controller.go b/pkg/controller/certificates/certificate_controller.go index 2ec79936ef2..36698cb036e 100644 --- a/pkg/controller/certificates/certificate_controller.go +++ b/pkg/controller/certificates/certificate_controller.go @@ -113,7 +113,7 @@ func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting certificate controller") defer klog.Infof("Shutting down certificate controller") - if !controller.WaitForCacheSync("certificate", stopCh, cc.csrsSynced) { + if !cache.WaitForNamedCacheSync("certificate", stopCh, cc.csrsSynced) { return } diff --git a/pkg/controller/certificates/rootcacertpublisher/BUILD b/pkg/controller/certificates/rootcacertpublisher/BUILD index 343843a7e64..0e608bf9ed8 100644 --- a/pkg/controller/certificates/rootcacertpublisher/BUILD +++ b/pkg/controller/certificates/rootcacertpublisher/BUILD @@ -6,7 +6,6 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/certificates/rootcacertpublisher", visibility = ["//visibility:public"], deps = [ - "//pkg/controller:go_default_library", "//pkg/util/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", diff --git a/pkg/controller/certificates/rootcacertpublisher/publisher.go b/pkg/controller/certificates/rootcacertpublisher/publisher.go index 1d4377781c8..d4c6cc84cb8 100644 --- a/pkg/controller/certificates/rootcacertpublisher/publisher.go +++ b/pkg/controller/certificates/rootcacertpublisher/publisher.go @@ -21,7 +21,7 @@ import ( "reflect" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -32,7 +32,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" ) @@ -98,7 +97,7 @@ func (c *Publisher) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting root CA certificate configmap publisher") defer klog.Infof("Shutting down root CA certificate configmap publisher") - if !controller.WaitForCacheSync("crt configmap", stopCh, c.cmListerSynced) { + if !cache.WaitForNamedCacheSync("crt configmap", stopCh, c.cmListerSynced) { return } diff --git a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go index b5c9f873dbc..4548e4c76d1 100644 --- a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go +++ b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go @@ -148,7 +148,7 @@ func (c *ClusterRoleAggregationController) Run(workers int, stopCh <-chan struct klog.Infof("Starting ClusterRoleAggregator") defer klog.Infof("Shutting down ClusterRoleAggregator") - if !controller.WaitForCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) { + if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) { return } diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 6e696439789..025bf9d5ace 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/rand" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" @@ -1022,21 +1021,6 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n return err } -// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages -// indicating that the controller identified by controllerName is waiting for syncs, followed by -// either a successful or failed sync. -func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool { - klog.Infof("Waiting for caches to sync for %s controller", controllerName) - - if !cache.WaitForCacheSync(stopCh, cacheSyncs...) { - utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s controller", controllerName)) - return false - } - - klog.Infof("Caches are synced for %s controller", controllerName) - return true -} - // ComputeHash returns a hash value calculated from pod template and // a collisionCount to avoid hash collision. The hash will be safe encoded to // avoid bad words. diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 6124f7bffae..b10b5c3e6f1 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -267,7 +267,7 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting daemon sets controller") defer klog.Infof("Shutting down daemon sets controller") - if !controller.WaitForCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) { + if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) { return } diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index cea0156ecc4..77e8b7f43f7 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -152,7 +152,7 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting deployment controller") defer klog.Infof("Shutting down deployment controller") - if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { + if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index c2468e913fd..6fe7d16b222 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -333,7 +333,7 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) { klog.Infof("Starting disruption controller") defer klog.Infof("Shutting down disruption controller") - if !controller.WaitForCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { + if !cache.WaitForNamedCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { return } diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index b2157e75f70..907b4786721 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -170,7 +170,7 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting endpoint controller") defer klog.Infof("Shutting down endpoint controller") - if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) { + if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) { return } diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 2d1d200ed5f..6c919626b3e 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -749,7 +749,7 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) { defer close(stopCh) go endpoints.Run(1, stopCh) - // cache.WaitForCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period. + // cache.WaitForNamedCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period. // To ensure we get all updates, including unexpected ones, we need to wait at least as long as // a single cache sync period and worker period, with some fudge room. time.Sleep(150 * time.Millisecond) diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 75251f8718e..814c15a33e8 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/metadata" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller" @@ -131,7 +132,7 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { go gc.dependencyGraphBuilder.Run(stopCh) - if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) { + if !cache.WaitForNamedCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) { return } @@ -225,7 +226,7 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf // informers keep attempting to sync in the background, so retrying doesn't interrupt them. // the call to resyncMonitors on the reattempt will no-op for resources that still exist. // note that workers stay paused until we successfully resync. - if !controller.WaitForCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) { + if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt)) return false, nil } diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 766bd5423f6..15d96d73499 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -837,7 +837,7 @@ func TestGarbageCollectorSync(t *testing.T) { // wait.PollImmediateUntil() loops with 100ms (hardcode) util the `stopCh` is closed: // GetDeletableResources() // gc.resyncMonitors() - // controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced. + // cache.WaitForNamedCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced. // // Setting the period to 200ms allows the WaitForCacheSync() to check // for cache sync ~2 times in every wait.PollImmediateUntil() loop. diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 50af075a6f4..62164751c9e 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -143,7 +143,7 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting job controller") defer klog.Infof("Shutting down job controller") - if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) { + if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) { return } diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index b5d374d4f56..73a6912ea55 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -186,7 +186,7 @@ func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting namespace controller") defer klog.Infof("Shutting down namespace controller") - if !controller.WaitForCacheSync("namespace", stopCh, nm.listerSynced) { + if !cache.WaitForNamedCacheSync("namespace", stopCh, nm.listerSynced) { return } diff --git a/pkg/controller/nodeipam/BUILD b/pkg/controller/nodeipam/BUILD index 944e519b1c9..ac823d9c0f0 100644 --- a/pkg/controller/nodeipam/BUILD +++ b/pkg/controller/nodeipam/BUILD @@ -31,7 +31,6 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/nodeipam", deps = [ - "//pkg/controller:go_default_library", "//pkg/controller/nodeipam/ipam:go_default_library", "//pkg/controller/nodeipam/ipam/sync:go_default_library", "//pkg/util/metrics:go_default_library", diff --git a/pkg/controller/nodeipam/ipam/BUILD b/pkg/controller/nodeipam/ipam/BUILD index b50ce534ddb..d2cbea52441 100644 --- a/pkg/controller/nodeipam/ipam/BUILD +++ b/pkg/controller/nodeipam/ipam/BUILD @@ -42,7 +42,6 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam", deps = [ - "//pkg/controller:go_default_library", "//pkg/controller/nodeipam/ipam/cidrset:go_default_library", "//pkg/controller/nodeipam/ipam/sync:go_default_library", "//pkg/controller/util/node:go_default_library", diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go index c5fde54632b..64b4846d678 100644 --- a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go @@ -39,7 +39,6 @@ import ( "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" cloudprovider "k8s.io/cloud-provider" - "k8s.io/kubernetes/pkg/controller" nodeutil "k8s.io/kubernetes/pkg/controller/util/node" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -136,7 +135,7 @@ func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) { klog.Infof("Starting cloud CIDR allocator") defer klog.Infof("Shutting down cloud CIDR allocator") - if !controller.WaitForCacheSync("cidrallocator", stopCh, ca.nodesSynced) { + if !cache.WaitForNamedCacheSync("cidrallocator", stopCh, ca.nodesSynced) { return } diff --git a/pkg/controller/nodeipam/ipam/range_allocator.go b/pkg/controller/nodeipam/ipam/range_allocator.go index 09c33892f59..4763450a866 100644 --- a/pkg/controller/nodeipam/ipam/range_allocator.go +++ b/pkg/controller/nodeipam/ipam/range_allocator.go @@ -35,7 +35,6 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset" nodeutil "k8s.io/kubernetes/pkg/controller/util/node" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -167,7 +166,7 @@ func (r *rangeAllocator) Run(stopCh <-chan struct{}) { klog.Infof("Starting range CIDR allocator") defer klog.Infof("Shutting down range CIDR allocator") - if !controller.WaitForCacheSync("cidrallocator", stopCh, r.nodesSynced) { + if !cache.WaitForNamedCacheSync("cidrallocator", stopCh, r.nodesSynced) { return } diff --git a/pkg/controller/nodeipam/node_ipam_controller.go b/pkg/controller/nodeipam/node_ipam_controller.go index 337e7ecfd54..0bd21118471 100644 --- a/pkg/controller/nodeipam/node_ipam_controller.go +++ b/pkg/controller/nodeipam/node_ipam_controller.go @@ -33,7 +33,6 @@ import ( clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" cloudprovider "k8s.io/cloud-provider" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync" "k8s.io/kubernetes/pkg/util/metrics" @@ -179,7 +178,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { klog.Infof("Starting ipam controller") defer klog.Infof("Shutting down ipam controller") - if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced) { + if !cache.WaitForNamedCacheSync("node", stopCh, nc.nodeInformerSynced) { return } diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index b0ebcc6df90..2cf549ea5c2 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -455,7 +455,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { klog.Infof("Starting node controller") defer klog.Infof("Shutting down node controller") - if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { + if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { return } diff --git a/pkg/controller/podautoscaler/BUILD b/pkg/controller/podautoscaler/BUILD index e82ee00ec69..c6ad0ce212c 100644 --- a/pkg/controller/podautoscaler/BUILD +++ b/pkg/controller/podautoscaler/BUILD @@ -78,6 +78,7 @@ go_test( "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/scale/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/metrics/pkg/apis/custom_metrics/v1beta2:go_default_library", "//staging/src/k8s.io/metrics/pkg/apis/external_metrics/v1beta1:go_default_library", "//staging/src/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library", diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index b3426b6ae71..31aea7afb44 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -156,7 +156,7 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) { klog.Infof("Starting HPA controller") defer klog.Infof("Shutting down HPA controller") - if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) { + if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) { return } diff --git a/pkg/controller/podautoscaler/legacy_replica_calculator_test.go b/pkg/controller/podautoscaler/legacy_replica_calculator_test.go index 083d2bdd258..cb472332ba1 100644 --- a/pkg/controller/podautoscaler/legacy_replica_calculator_test.go +++ b/pkg/controller/podautoscaler/legacy_replica_calculator_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" @@ -196,7 +197,7 @@ func (tc *legacyReplicaCalcTestCase) runTest(t *testing.T) { stop := make(chan struct{}) defer close(stop) informerFactory.Start(stop) - if !controller.WaitForCacheSync("HPA", stop, informer.Informer().HasSynced) { + if !cache.WaitForNamedCacheSync("HPA", stop, informer.Informer().HasSynced) { return } diff --git a/pkg/controller/podautoscaler/replica_calculator_test.go b/pkg/controller/podautoscaler/replica_calculator_test.go index 22272766038..7d392eab802 100644 --- a/pkg/controller/podautoscaler/replica_calculator_test.go +++ b/pkg/controller/podautoscaler/replica_calculator_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/controller" metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" @@ -349,7 +350,7 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) { stop := make(chan struct{}) defer close(stop) informerFactory.Start(stop) - if !controller.WaitForCacheSync("HPA", stop, informer.Informer().HasSynced) { + if !cache.WaitForNamedCacheSync("HPA", stop, informer.Informer().HasSynced) { return } diff --git a/pkg/controller/podgc/BUILD b/pkg/controller/podgc/BUILD index edbdeb4ae30..a44f2c18f9a 100644 --- a/pkg/controller/podgc/BUILD +++ b/pkg/controller/podgc/BUILD @@ -14,7 +14,6 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/podgc", deps = [ - "//pkg/controller:go_default_library", "//pkg/util/metrics:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index a288bc86b37..14b08eca46c 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -31,7 +31,6 @@ import ( clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/klog" @@ -76,7 +75,7 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) { klog.Infof("Starting GC controller") defer klog.Infof("Shutting down GC controller") - if !controller.WaitForCacheSync("GC", stop, gcc.podListerSynced) { + if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced) { return } diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 169fa85485c..b61330afd57 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -182,7 +182,7 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting %v controller", controllerName) defer klog.Infof("Shutting down %v controller", controllerName) - if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) { + if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) { return } diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 4c32eb4896f..6a1db2a2a2d 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -275,7 +275,7 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { go rq.quotaMonitor.Run(stopCh) } - if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) { + if !cache.WaitForNamedCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) { return } @@ -442,7 +442,7 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. // informers keep attempting to sync in the background, so retrying doesn't interrupt them. // the call to resyncMonitors on the reattempt will no-op for resources that still exist. - if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) { + if rq.quotaMonitor != nil && !cache.WaitForNamedCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync")) return } diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index be09292777f..cd3a47aadb8 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -1066,7 +1066,7 @@ func TestDiscoverySync(t *testing.T) { // wait.Until() loops with `period` until the `stopCh` is closed : // GetQuotableResources() // resyncMonitors() - // controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced. + // cache.WaitForNamedCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced. // // Setting the period to 200ms allows the WaitForCacheSync() to check // for cache sync ~2 times in every wait.Until() loop. diff --git a/pkg/controller/route/BUILD b/pkg/controller/route/BUILD index accb9a40b07..848656bf7b8 100644 --- a/pkg/controller/route/BUILD +++ b/pkg/controller/route/BUILD @@ -14,7 +14,6 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/route", deps = [ - "//pkg/controller:go_default_library", "//pkg/controller/util/node:go_default_library", "//pkg/util/metrics:go_default_library", "//pkg/util/node:go_default_library", diff --git a/pkg/controller/route/route_controller.go b/pkg/controller/route/route_controller.go index c2032ef8e1c..3f0dbe5b760 100644 --- a/pkg/controller/route/route_controller.go +++ b/pkg/controller/route/route_controller.go @@ -40,7 +40,6 @@ import ( "k8s.io/client-go/tools/record" clientretry "k8s.io/client-go/util/retry" cloudprovider "k8s.io/cloud-provider" - "k8s.io/kubernetes/pkg/controller" nodeutil "k8s.io/kubernetes/pkg/controller/util/node" "k8s.io/kubernetes/pkg/util/metrics" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -102,7 +101,7 @@ func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration) klog.Info("Starting route controller") defer klog.Info("Shutting down route controller") - if !controller.WaitForCacheSync("route", stopCh, rc.nodeListerSynced) { + if !cache.WaitForNamedCacheSync("route", stopCh, rc.nodeListerSynced) { return } diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index 8fd896bfcc2..9985b37b0bd 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -197,7 +197,7 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) { klog.Info("Starting service controller") defer klog.Info("Shutting down service controller") - if !controller.WaitForCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) { + if !cache.WaitForNamedCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) { return } diff --git a/pkg/controller/serviceaccount/BUILD b/pkg/controller/serviceaccount/BUILD index 7696f778c58..3beb71374b2 100644 --- a/pkg/controller/serviceaccount/BUILD +++ b/pkg/controller/serviceaccount/BUILD @@ -16,7 +16,6 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/controller/serviceaccount", deps = [ - "//pkg/controller:go_default_library", "//pkg/registry/core/secret:go_default_library", "//pkg/serviceaccount:go_default_library", "//pkg/util/metrics:go_default_library", diff --git a/pkg/controller/serviceaccount/serviceaccounts_controller.go b/pkg/controller/serviceaccount/serviceaccounts_controller.go index c31ba1e0ee6..a9e99003c28 100644 --- a/pkg/controller/serviceaccount/serviceaccounts_controller.go +++ b/pkg/controller/serviceaccount/serviceaccounts_controller.go @@ -32,7 +32,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" ) @@ -117,7 +116,7 @@ func (c *ServiceAccountsController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting service account controller") defer klog.Infof("Shutting down service account controller") - if !controller.WaitForCacheSync("service account", stopCh, c.saListerSynced, c.nsListerSynced) { + if !cache.WaitForNamedCacheSync("service account", stopCh, c.saListerSynced, c.nsListerSynced) { return } diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index 9404a64c800..c5531ec9ffb 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -36,7 +36,6 @@ import ( clientretry "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/registry/core/secret" "k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/util/metrics" @@ -169,7 +168,7 @@ func (e *TokensController) Run(workers int, stopCh <-chan struct{}) { defer e.syncServiceAccountQueue.ShutDown() defer e.syncSecretQueue.ShutDown() - if !controller.WaitForCacheSync("tokens", stopCh, e.serviceAccountSynced, e.secretSynced) { + if !cache.WaitForNamedCacheSync("tokens", stopCh, e.serviceAccountSynced, e.secretSynced) { return } diff --git a/pkg/controller/statefulset/stateful_set.go b/pkg/controller/statefulset/stateful_set.go index cf078dc20dc..8f6122ba8d7 100644 --- a/pkg/controller/statefulset/stateful_set.go +++ b/pkg/controller/statefulset/stateful_set.go @@ -145,7 +145,7 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting stateful set controller") defer klog.Infof("Shutting down statefulset controller") - if !controller.WaitForCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) { + if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) { return } diff --git a/pkg/controller/ttl/ttl_controller.go b/pkg/controller/ttl/ttl_controller.go index e9b6728e67e..dbd4c651b32 100644 --- a/pkg/controller/ttl/ttl_controller.go +++ b/pkg/controller/ttl/ttl_controller.go @@ -116,7 +116,7 @@ func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting TTL controller") defer klog.Infof("Shutting down TTL controller") - if !controller.WaitForCacheSync("TTL", stopCh, ttlc.hasSynced) { + if !cache.WaitForNamedCacheSync("TTL", stopCh, ttlc.hasSynced) { return } diff --git a/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go b/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go index df8bc016144..8e55660b969 100644 --- a/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go +++ b/pkg/controller/ttlafterfinished/ttlafterfinished_controller.go @@ -105,7 +105,7 @@ func (tc *Controller) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting TTL after finished controller") defer klog.Infof("Shutting down TTL after finished controller") - if !controller.WaitForCacheSync("TTL after finished", stopCh, tc.jListerSynced) { + if !cache.WaitForNamedCacheSync("TTL after finished", stopCh, tc.jListerSynced) { return } diff --git a/pkg/controller/volume/attachdetach/BUILD b/pkg/controller/volume/attachdetach/BUILD index a4a84dbe337..e6621a7db26 100644 --- a/pkg/controller/volume/attachdetach/BUILD +++ b/pkg/controller/volume/attachdetach/BUILD @@ -11,7 +11,6 @@ go_library( srcs = ["attach_detach_controller.go"], importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach", deps = [ - "//pkg/controller:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/metrics:go_default_library", "//pkg/controller/volume/attachdetach/populator:go_default_library", @@ -62,6 +61,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index 12bdb494562..f0e3774f5ca 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -43,7 +43,6 @@ import ( "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator" @@ -343,7 +342,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { synced = append(synced, adc.csiDriversSynced) } - if !controller.WaitForCacheSync("attach detach", stopCh, synced...) { + if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) { return } diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 80518e6f579..eaef4f3b1d0 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -21,11 +21,12 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" + kcache "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" @@ -179,7 +180,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2 informerFactory.Start(stopCh) - if !controller.WaitForCacheSync("attach detach", stopCh, + if !kcache.WaitForNamedCacheSync("attach detach", stopCh, informerFactory.Core().V1().Pods().Informer().HasSynced, informerFactory.Core().V1().Nodes().Informer().HasSynced) { t.Fatalf("Error waiting for the informer caches to sync") diff --git a/pkg/controller/volume/expand/BUILD b/pkg/controller/volume/expand/BUILD index cf81d3d99f8..11c7d30c94d 100644 --- a/pkg/controller/volume/expand/BUILD +++ b/pkg/controller/volume/expand/BUILD @@ -8,7 +8,6 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/volume/expand", deps = [ "//pkg/apis/core/v1/helper:go_default_library", - "//pkg/controller:go_default_library", "//pkg/controller/volume/events:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index 7c3ac55ea64..2a1f04a3769 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -43,7 +43,6 @@ import ( cloudprovider "k8s.io/cloud-provider" csitranslation "k8s.io/csi-translation-lib" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/events" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/volume" @@ -301,7 +300,7 @@ func (expc *expandController) Run(stopCh <-chan struct{}) { klog.Infof("Starting expand controller") defer klog.Infof("Shutting down expand controller") - if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) { + if !cache.WaitForNamedCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) { return } diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index adf53d8be83..bb7aa88b8ae 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -282,7 +282,7 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { klog.Infof("Starting persistent volume controller") defer klog.Infof("Shutting down persistent volume controller") - if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) { + if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) { return } diff --git a/pkg/controller/volume/pvcprotection/BUILD b/pkg/controller/volume/pvcprotection/BUILD index 1be72d6d342..b3097a7b1e6 100644 --- a/pkg/controller/volume/pvcprotection/BUILD +++ b/pkg/controller/volume/pvcprotection/BUILD @@ -6,7 +6,6 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/volume/pvcprotection", visibility = ["//visibility:public"], deps = [ - "//pkg/controller:go_default_library", "//pkg/controller/volume/protectionutil:go_default_library", "//pkg/util/metrics:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go index b6f964cc69c..3e9938f3a33 100644 --- a/pkg/controller/volume/pvcprotection/pvc_protection_controller.go +++ b/pkg/controller/volume/pvcprotection/pvc_protection_controller.go @@ -32,7 +32,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/protectionutil" "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/slice" @@ -101,7 +100,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting PVC protection controller") defer klog.Infof("Shutting down PVC protection controller") - if !controller.WaitForCacheSync("PVC protection", stopCh, c.pvcListerSynced, c.podListerSynced) { + if !cache.WaitForNamedCacheSync("PVC protection", stopCh, c.pvcListerSynced, c.podListerSynced) { return } diff --git a/pkg/controller/volume/pvprotection/BUILD b/pkg/controller/volume/pvprotection/BUILD index c1af6ae73e3..904dcbef69a 100644 --- a/pkg/controller/volume/pvprotection/BUILD +++ b/pkg/controller/volume/pvprotection/BUILD @@ -6,7 +6,6 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/volume/pvprotection", visibility = ["//visibility:public"], deps = [ - "//pkg/controller:go_default_library", "//pkg/controller/volume/protectionutil:go_default_library", "//pkg/util/metrics:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/controller/volume/pvprotection/pv_protection_controller.go b/pkg/controller/volume/pvprotection/pv_protection_controller.go index f892408c4f2..1fe836e3e2d 100644 --- a/pkg/controller/volume/pvprotection/pv_protection_controller.go +++ b/pkg/controller/volume/pvprotection/pv_protection_controller.go @@ -30,7 +30,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/protectionutil" "k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/slice" @@ -82,7 +81,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) { klog.Infof("Starting PV protection controller") defer klog.Infof("Shutting down PV protection controller") - if !controller.WaitForCacheSync("PV protection", stopCh, c.pvListerSynced) { + if !cache.WaitForNamedCacheSync("PV protection", stopCh, c.pvListerSynced) { return } diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index 1b689f13157..49bd9b1d721 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -17,7 +17,6 @@ go_library( "//cmd/kube-proxy/app:go_default_library", "//cmd/kubelet/app:go_default_library", "//cmd/kubelet/app/options:go_default_library", - "//pkg/controller:go_default_library", "//pkg/kubelet:go_default_library", "//pkg/kubelet/apis/config:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", diff --git a/pkg/kubemark/controller.go b/pkg/kubemark/controller.go index 3ae38b2f5ef..d5ce070c7ab 100644 --- a/pkg/kubemark/controller.go +++ b/pkg/kubemark/controller.go @@ -31,7 +31,6 @@ import ( kubeclient "k8s.io/client-go/kubernetes" listersv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/controller" "k8s.io/klog" ) @@ -114,7 +113,7 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer // WaitForCacheSync waits until all caches in the controller are populated. func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool { - return controller.WaitForCacheSync("kubemark", stopCh, + return cache.WaitForNamedCacheSync("kubemark", stopCh, kubemarkController.externalCluster.rcSynced, kubemarkController.externalCluster.podSynced, kubemarkController.kubemarkCluster.nodeSynced) diff --git a/pkg/master/controller/crdregistration/BUILD b/pkg/master/controller/crdregistration/BUILD index a989520e105..3b8b341e772 100644 --- a/pkg/master/controller/crdregistration/BUILD +++ b/pkg/master/controller/crdregistration/BUILD @@ -11,7 +11,6 @@ go_library( srcs = ["crdregistration_controller.go"], importpath = "k8s.io/kubernetes/pkg/master/controller/crdregistration", deps = [ - "//pkg/controller:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:go_default_library", diff --git a/pkg/master/controller/crdregistration/crdregistration_controller.go b/pkg/master/controller/crdregistration/crdregistration_controller.go index 9e453072d34..0afbfd91e25 100644 --- a/pkg/master/controller/crdregistration/crdregistration_controller.go +++ b/pkg/master/controller/crdregistration/crdregistration_controller.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" - "k8s.io/kubernetes/pkg/controller" ) // AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for @@ -113,7 +112,7 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{}) defer klog.Infof("Shutting down crd-autoregister controller") // wait for your secondary caches to fill before starting your work - if !controller.WaitForCacheSync("crd-autoregister", stopCh, c.crdSynced) { + if !cache.WaitForNamedCacheSync("crd-autoregister", stopCh, c.crdSynced) { return } diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index ccf8a0e4d16..353a30ae224 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -14,7 +14,6 @@ go_library( ], importpath = "k8s.io/kubernetes/pkg/proxy/config", deps = [ - "//pkg/controller:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 6d16a376a3d..97f78f110fe 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -25,7 +25,6 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog" - "k8s.io/kubernetes/pkg/controller" ) // ServiceHandler is an abstract interface of objects which receive @@ -95,7 +94,7 @@ func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) { func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { klog.Info("Starting endpoints config controller") - if !controller.WaitForCacheSync("endpoints config", stopCh, c.listerSynced) { + if !cache.WaitForNamedCacheSync("endpoints config", stopCh, c.listerSynced) { return } @@ -186,7 +185,7 @@ func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) { func (c *ServiceConfig) Run(stopCh <-chan struct{}) { klog.Info("Starting service config controller") - if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) { + if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) { return } diff --git a/staging/src/k8s.io/client-go/examples/workqueue/main.go b/staging/src/k8s.io/client-go/examples/workqueue/main.go index c306aaae00c..6cfa4ecd5dc 100644 --- a/staging/src/k8s.io/client-go/examples/workqueue/main.go +++ b/staging/src/k8s.io/client-go/examples/workqueue/main.go @@ -23,7 +23,7 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/runtime" diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 351c85b9031..c37423b6652 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -190,8 +190,24 @@ const ( initialBufferSize = 1024 ) +// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages +// indicating that the caller identified by name is waiting for syncs, followed by +// either a successful or failed sync. +func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { + klog.Infof("Waiting for caches to sync for %s", controllerName) + + if !WaitForCacheSync(stopCh, cacheSyncs...) { + utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName)) + return false + } + + klog.Infof("Caches are synced for %s ", controllerName) + return true +} + // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false // if the controller should shutdown +// callers should prefer WaitForNamedCacheSync() func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { err := wait.PollUntil(syncedPollPeriod, func() (bool, error) { diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index 72432413c0a..a251fd7b98c 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -83,7 +83,6 @@ go_library( deps = [ "//pkg/api/legacyscheme:go_default_library", "//pkg/api/v1/pod:go_default_library", - "//pkg/controller:go_default_library", "//pkg/controller/disruption:go_default_library", "//pkg/scheduler:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library", @@ -111,6 +110,7 @@ go_library( "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/restmapper:go_default_library", "//staging/src/k8s.io/client-go/scale:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//test/integration/framework:go_default_library", "//test/utils/image:go_default_library", diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index cd6fdaadc1e..4793180a6d0 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -42,10 +42,10 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/scale" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/scheduler" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -221,7 +221,7 @@ func initTestSchedulerWithOptions( // set setPodInformer if provided. if setPodInformer { go podInformer.Informer().Run(context.schedulerConfig.StopEverything) - controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) + cache.WaitForNamedCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) } eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{