move WaitForCacheSync to the sharedInformer package

Signed-off-by: Yassine TIJANI <ytijani@vmware.com>
This commit is contained in:
Yassine TIJANI 2019-08-16 21:15:53 +02:00
parent 9b54021c65
commit 7e4c3096fe
61 changed files with 72 additions and 96 deletions

View File

@ -74,7 +74,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -37,7 +37,6 @@ import (
bootstrapapi "k8s.io/cluster-bootstrap/token/api"
jws "k8s.io/cluster-bootstrap/token/jws"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
)
@ -159,7 +158,7 @@ func (e *Signer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer e.syncQueue.ShutDown()
if !controller.WaitForCacheSync("bootstrap_signer", stopCh, e.configMapSynced, e.secretSynced) {
if !cache.WaitForNamedCacheSync("bootstrap_signer", stopCh, e.configMapSynced, e.secretSynced) {
return
}

View File

@ -117,7 +117,7 @@ func (tc *TokenCleaner) Run(stopCh <-chan struct{}) {
klog.Infof("Starting token cleaner controller")
defer klog.Infof("Shutting down token cleaner controller")
if !controller.WaitForCacheSync("token_cleaner", stopCh, tc.secretSynced) {
if !cache.WaitForNamedCacheSync("token_cleaner", stopCh, tc.secretSynced) {
return
}

View File

@ -113,7 +113,7 @@ func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting certificate controller")
defer klog.Infof("Shutting down certificate controller")
if !controller.WaitForCacheSync("certificate", stopCh, cc.csrsSynced) {
if !cache.WaitForNamedCacheSync("certificate", stopCh, cc.csrsSynced) {
return
}

View File

@ -6,7 +6,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/certificates/rootcacertpublisher",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/util/metrics:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",

View File

@ -21,7 +21,7 @@ import (
"reflect"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -32,7 +32,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
)
@ -98,7 +97,7 @@ func (c *Publisher) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting root CA certificate configmap publisher")
defer klog.Infof("Shutting down root CA certificate configmap publisher")
if !controller.WaitForCacheSync("crt configmap", stopCh, c.cmListerSynced) {
if !cache.WaitForNamedCacheSync("crt configmap", stopCh, c.cmListerSynced) {
return
}

View File

@ -148,7 +148,7 @@ func (c *ClusterRoleAggregationController) Run(workers int, stopCh <-chan struct
klog.Infof("Starting ClusterRoleAggregator")
defer klog.Infof("Shutting down ClusterRoleAggregator")
if !controller.WaitForCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) {
if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) {
return
}

View File

@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
@ -1022,21 +1021,6 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n
return err
}
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
// indicating that the controller identified by controllerName is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
klog.Infof("Waiting for caches to sync for %s controller", controllerName)
if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s controller", controllerName))
return false
}
klog.Infof("Caches are synced for %s controller", controllerName)
return true
}
// ComputeHash returns a hash value calculated from pod template and
// a collisionCount to avoid hash collision. The hash will be safe encoded to
// avoid bad words.

View File

@ -267,7 +267,7 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting daemon sets controller")
defer klog.Infof("Shutting down daemon sets controller")
if !controller.WaitForCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
return
}

View File

@ -152,7 +152,7 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting deployment controller")
defer klog.Infof("Shutting down deployment controller")
if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}

View File

@ -333,7 +333,7 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting disruption controller")
defer klog.Infof("Shutting down disruption controller")
if !controller.WaitForCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
if !cache.WaitForNamedCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
return
}

View File

@ -170,7 +170,7 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting endpoint controller")
defer klog.Infof("Shutting down endpoint controller")
if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}

View File

@ -749,7 +749,7 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) {
defer close(stopCh)
go endpoints.Run(1, stopCh)
// cache.WaitForCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
// cache.WaitForNamedCacheSync has a 100ms poll period, and the endpoints worker has a 10ms period.
// To ensure we get all updates, including unexpected ones, we need to wait at least as long as
// a single cache sync period and worker period, with some fudge room.
time.Sleep(150 * time.Millisecond)

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/metadata"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
@ -131,7 +132,7 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
go gc.dependencyGraphBuilder.Run(stopCh)
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
if !cache.WaitForNamedCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
return
}
@ -225,7 +226,7 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
// note that workers stay paused until we successfully resync.
if !controller.WaitForCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
return false, nil
}

View File

@ -837,7 +837,7 @@ func TestGarbageCollectorSync(t *testing.T) {
// wait.PollImmediateUntil() loops with 100ms (hardcode) util the `stopCh` is closed:
// GetDeletableResources()
// gc.resyncMonitors()
// controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced.
// cache.WaitForNamedCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced.
//
// Setting the period to 200ms allows the WaitForCacheSync() to check
// for cache sync ~2 times in every wait.PollImmediateUntil() loop.

View File

@ -143,7 +143,7 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting job controller")
defer klog.Infof("Shutting down job controller")
if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
return
}

View File

@ -186,7 +186,7 @@ func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting namespace controller")
defer klog.Infof("Shutting down namespace controller")
if !controller.WaitForCacheSync("namespace", stopCh, nm.listerSynced) {
if !cache.WaitForNamedCacheSync("namespace", stopCh, nm.listerSynced) {
return
}

View File

@ -31,7 +31,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam",
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/nodeipam/ipam:go_default_library",
"//pkg/controller/nodeipam/ipam/sync:go_default_library",
"//pkg/util/metrics:go_default_library",

View File

@ -42,7 +42,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/controller/nodeipam/ipam",
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/nodeipam/ipam/cidrset:go_default_library",
"//pkg/controller/nodeipam/ipam/sync:go_default_library",
"//pkg/controller/util/node:go_default_library",

View File

@ -39,7 +39,6 @@ import (
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
utilnode "k8s.io/kubernetes/pkg/util/node"
@ -136,7 +135,7 @@ func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
klog.Infof("Starting cloud CIDR allocator")
defer klog.Infof("Shutting down cloud CIDR allocator")
if !controller.WaitForCacheSync("cidrallocator", stopCh, ca.nodesSynced) {
if !cache.WaitForNamedCacheSync("cidrallocator", stopCh, ca.nodesSynced) {
return
}

View File

@ -35,7 +35,6 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
utilnode "k8s.io/kubernetes/pkg/util/node"
@ -167,7 +166,7 @@ func (r *rangeAllocator) Run(stopCh <-chan struct{}) {
klog.Infof("Starting range CIDR allocator")
defer klog.Infof("Shutting down range CIDR allocator")
if !controller.WaitForCacheSync("cidrallocator", stopCh, r.nodesSynced) {
if !cache.WaitForNamedCacheSync("cidrallocator", stopCh, r.nodesSynced) {
return
}

View File

@ -33,7 +33,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
"k8s.io/kubernetes/pkg/util/metrics"
@ -179,7 +178,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
klog.Infof("Starting ipam controller")
defer klog.Infof("Shutting down ipam controller")
if !controller.WaitForCacheSync("node", stopCh, nc.nodeInformerSynced) {
if !cache.WaitForNamedCacheSync("node", stopCh, nc.nodeInformerSynced) {
return
}

View File

@ -455,7 +455,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}

View File

@ -78,6 +78,7 @@ go_test(
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/scale/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/metrics/pkg/apis/custom_metrics/v1beta2:go_default_library",
"//staging/src/k8s.io/metrics/pkg/apis/external_metrics/v1beta1:go_default_library",
"//staging/src/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library",

View File

@ -156,7 +156,7 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting HPA controller")
defer klog.Infof("Shutting down HPA controller")
if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
return
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
@ -196,7 +197,7 @@ func (tc *legacyReplicaCalcTestCase) runTest(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
if !controller.WaitForCacheSync("HPA", stop, informer.Informer().HasSynced) {
if !cache.WaitForNamedCacheSync("HPA", stop, informer.Informer().HasSynced) {
return
}

View File

@ -34,6 +34,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
@ -349,7 +350,7 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
if !controller.WaitForCacheSync("HPA", stop, informer.Informer().HasSynced) {
if !cache.WaitForNamedCacheSync("HPA", stop, informer.Informer().HasSynced) {
return
}

View File

@ -14,7 +14,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/controller/podgc",
deps = [
"//pkg/controller:go_default_library",
"//pkg/util/metrics:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -31,7 +31,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/klog"
@ -76,7 +75,7 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) {
klog.Infof("Starting GC controller")
defer klog.Infof("Shutting down GC controller")
if !controller.WaitForCacheSync("GC", stop, gcc.podListerSynced) {
if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced) {
return
}

View File

@ -182,7 +182,7 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)
if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}

View File

@ -275,7 +275,7 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
go rq.quotaMonitor.Run(stopCh)
}
if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
if !cache.WaitForNamedCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
return
}
@ -442,7 +442,7 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
if rq.quotaMonitor != nil && !cache.WaitForNamedCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
return
}

View File

@ -1066,7 +1066,7 @@ func TestDiscoverySync(t *testing.T) {
// wait.Until() loops with `period` until the `stopCh` is closed :
// GetQuotableResources()
// resyncMonitors()
// controller.WaitForCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced.
// cache.WaitForNamedCacheSync() loops with `syncedPollPeriod` (hardcoded to 100ms), until either its stop channel is closed after `period`, or all caches synced.
//
// Setting the period to 200ms allows the WaitForCacheSync() to check
// for cache sync ~2 times in every wait.Until() loop.

View File

@ -14,7 +14,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/controller/route",
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/node:go_default_library",

View File

@ -40,7 +40,6 @@ import (
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
@ -102,7 +101,7 @@ func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration)
klog.Info("Starting route controller")
defer klog.Info("Shutting down route controller")
if !controller.WaitForCacheSync("route", stopCh, rc.nodeListerSynced) {
if !cache.WaitForNamedCacheSync("route", stopCh, rc.nodeListerSynced) {
return
}

View File

@ -197,7 +197,7 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
klog.Info("Starting service controller")
defer klog.Info("Shutting down service controller")
if !controller.WaitForCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) {
if !cache.WaitForNamedCacheSync("service", stopCh, s.serviceListerSynced, s.nodeListerSynced) {
return
}

View File

@ -16,7 +16,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/controller/serviceaccount",
deps = [
"//pkg/controller:go_default_library",
"//pkg/registry/core/secret:go_default_library",
"//pkg/serviceaccount:go_default_library",
"//pkg/util/metrics:go_default_library",

View File

@ -32,7 +32,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
)
@ -117,7 +116,7 @@ func (c *ServiceAccountsController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting service account controller")
defer klog.Infof("Shutting down service account controller")
if !controller.WaitForCacheSync("service account", stopCh, c.saListerSynced, c.nsListerSynced) {
if !cache.WaitForNamedCacheSync("service account", stopCh, c.saListerSynced, c.nsListerSynced) {
return
}

View File

@ -36,7 +36,6 @@ import (
clientretry "k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/registry/core/secret"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/metrics"
@ -169,7 +168,7 @@ func (e *TokensController) Run(workers int, stopCh <-chan struct{}) {
defer e.syncServiceAccountQueue.ShutDown()
defer e.syncSecretQueue.ShutDown()
if !controller.WaitForCacheSync("tokens", stopCh, e.serviceAccountSynced, e.secretSynced) {
if !cache.WaitForNamedCacheSync("tokens", stopCh, e.serviceAccountSynced, e.secretSynced) {
return
}

View File

@ -145,7 +145,7 @@ func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting stateful set controller")
defer klog.Infof("Shutting down statefulset controller")
if !controller.WaitForCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
}

View File

@ -116,7 +116,7 @@ func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting TTL controller")
defer klog.Infof("Shutting down TTL controller")
if !controller.WaitForCacheSync("TTL", stopCh, ttlc.hasSynced) {
if !cache.WaitForNamedCacheSync("TTL", stopCh, ttlc.hasSynced) {
return
}

View File

@ -105,7 +105,7 @@ func (tc *Controller) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting TTL after finished controller")
defer klog.Infof("Shutting down TTL after finished controller")
if !controller.WaitForCacheSync("TTL after finished", stopCh, tc.jListerSynced) {
if !cache.WaitForNamedCacheSync("TTL after finished", stopCh, tc.jListerSynced) {
return
}

View File

@ -11,7 +11,6 @@ go_library(
srcs = ["attach_detach_controller.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach",
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/metrics:go_default_library",
"//pkg/controller/volume/attachdetach/populator:go_default_library",
@ -62,6 +61,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -43,7 +43,6 @@ import (
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
@ -343,7 +342,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
synced = append(synced, adc.csiDriversSynced)
}
if !controller.WaitForCacheSync("attach detach", stopCh, synced...) {
if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) {
return
}

View File

@ -21,11 +21,12 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
@ -179,7 +180,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
informerFactory.Start(stopCh)
if !controller.WaitForCacheSync("attach detach", stopCh,
if !kcache.WaitForNamedCacheSync("attach detach", stopCh,
informerFactory.Core().V1().Pods().Informer().HasSynced,
informerFactory.Core().V1().Nodes().Informer().HasSynced) {
t.Fatalf("Error waiting for the informer caches to sync")

View File

@ -8,7 +8,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/expand",
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/events:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",

View File

@ -43,7 +43,6 @@ import (
cloudprovider "k8s.io/cloud-provider"
csitranslation "k8s.io/csi-translation-lib"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
@ -301,7 +300,7 @@ func (expc *expandController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting expand controller")
defer klog.Infof("Shutting down expand controller")
if !controller.WaitForCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) {
if !cache.WaitForNamedCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced, expc.classListerSynced) {
return
}

View File

@ -282,7 +282,7 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
klog.Infof("Starting persistent volume controller")
defer klog.Infof("Shutting down persistent volume controller")
if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
return
}

View File

@ -6,7 +6,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvcprotection",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/protectionutil:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/slice:go_default_library",

View File

@ -32,7 +32,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/slice"
@ -101,7 +100,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting PVC protection controller")
defer klog.Infof("Shutting down PVC protection controller")
if !controller.WaitForCacheSync("PVC protection", stopCh, c.pvcListerSynced, c.podListerSynced) {
if !cache.WaitForNamedCacheSync("PVC protection", stopCh, c.pvcListerSynced, c.podListerSynced) {
return
}

View File

@ -6,7 +6,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvprotection",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/volume/protectionutil:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/slice:go_default_library",

View File

@ -30,7 +30,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/pkg/util/slice"
@ -82,7 +81,7 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
klog.Infof("Starting PV protection controller")
defer klog.Infof("Shutting down PV protection controller")
if !controller.WaitForCacheSync("PV protection", stopCh, c.pvListerSynced) {
if !cache.WaitForNamedCacheSync("PV protection", stopCh, c.pvListerSynced) {
return
}

View File

@ -17,7 +17,6 @@ go_library(
"//cmd/kube-proxy/app:go_default_library",
"//cmd/kubelet/app:go_default_library",
"//cmd/kubelet/app/options:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/kubelet:go_default_library",
"//pkg/kubelet/apis/config:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",

View File

@ -31,7 +31,6 @@ import (
kubeclient "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/klog"
)
@ -114,7 +113,7 @@ func NewKubemarkController(externalClient kubeclient.Interface, externalInformer
// WaitForCacheSync waits until all caches in the controller are populated.
func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
return controller.WaitForCacheSync("kubemark", stopCh,
return cache.WaitForNamedCacheSync("kubemark", stopCh,
kubemarkController.externalCluster.rcSynced,
kubemarkController.externalCluster.podSynced,
kubemarkController.kubemarkCluster.nodeSynced)

View File

@ -11,7 +11,6 @@ go_library(
srcs = ["crdregistration_controller.go"],
importpath = "k8s.io/kubernetes/pkg/master/controller/crdregistration",
deps = [
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:go_default_library",

View File

@ -33,7 +33,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kubernetes/pkg/controller"
)
// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
@ -113,7 +112,7 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{})
defer klog.Infof("Shutting down crd-autoregister controller")
// wait for your secondary caches to fill before starting your work
if !controller.WaitForCacheSync("crd-autoregister", stopCh, c.crdSynced) {
if !cache.WaitForNamedCacheSync("crd-autoregister", stopCh, c.crdSynced) {
return
}

View File

@ -14,7 +14,6 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/proxy/config",
deps = [
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",

View File

@ -25,7 +25,6 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
)
// ServiceHandler is an abstract interface of objects which receive
@ -95,7 +94,7 @@ func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
klog.Info("Starting endpoints config controller")
if !controller.WaitForCacheSync("endpoints config", stopCh, c.listerSynced) {
if !cache.WaitForNamedCacheSync("endpoints config", stopCh, c.listerSynced) {
return
}
@ -186,7 +185,7 @@ func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
klog.Info("Starting service config controller")
if !controller.WaitForCacheSync("service config", stopCh, c.listerSynced) {
if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
return
}

View File

@ -23,7 +23,7 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"

View File

@ -190,8 +190,24 @@ const (
initialBufferSize = 1024
)
// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
// indicating that the caller identified by name is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
klog.Infof("Waiting for caches to sync for %s", controllerName)
if !WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
return false
}
klog.Infof("Caches are synced for %s ", controllerName)
return true
}
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
// callers should prefer WaitForNamedCacheSync()
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {

View File

@ -83,7 +83,6 @@ go_library(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/disruption:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
@ -111,6 +110,7 @@ go_library(
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/restmapper:go_default_library",
"//staging/src/k8s.io/client-go/scale:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//test/integration/framework:go_default_library",
"//test/utils/image:go_default_library",

View File

@ -42,10 +42,10 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/scheduler"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -221,7 +221,7 @@ func initTestSchedulerWithOptions(
// set setPodInformer if provided.
if setPodInformer {
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
cache.WaitForNamedCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
}
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{