diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 4202f190e70..1c86111615c 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -31,14 +31,34 @@ import ( // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterEndpointWorker() { - fedClient := sc.federationClient + // process all pending events in endpointWorkerDoneChan +ForLoop: + for { + select { + case clusterName := <-sc.endpointWorkerDoneChan: + sc.endpointWorkerMap[clusterName] = false + default: + // non-blocking, comes here if all existing events are processed + break ForLoop + } + } + for clusterName, cache := range sc.clusterCache.clientMap { + workerExist, found := sc.endpointWorkerMap[clusterName] + if found && workerExist { + continue + } + + // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { + fedClient := sc.federationClient for { func() { key, quit := cache.endpointQueue.Get() // update endpoint cache if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.endpointWorkerDoneChan <- clusterName return } defer cache.endpointQueue.Done(key) @@ -49,6 +69,7 @@ func (sc *ServiceController) clusterEndpointWorker() { }() } }(cache, clusterName) + sc.endpointWorkerMap[clusterName] = true } } diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index bb0178afffd..c8316cedd5b 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -35,16 +35,36 @@ import ( // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterServiceWorker() { - fedClient := sc.federationClient + // process all pending events in serviceWorkerDoneChan +ForLoop: + for { + select { + case clusterName := <-sc.serviceWorkerDoneChan: + sc.serviceWorkerMap[clusterName] = false + default: + // non-blocking, comes here if all existing events are processed + break ForLoop + } + } + for clusterName, cache := range sc.clusterCache.clientMap { + workerExist, found := sc.serviceWorkerMap[clusterName] + if found && workerExist { + continue + } + + // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { + fedClient := sc.federationClient for { func() { key, quit := cache.serviceQueue.Get() - defer cache.serviceQueue.Done(key) if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.serviceWorkerDoneChan <- clusterName return } + defer cache.serviceQueue.Done(key) err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) if err != nil { glog.Errorf("Failed to sync service: %+v", err) @@ -52,6 +72,7 @@ func (sc *ServiceController) clusterServiceWorker() { }() } }(cache, clusterName) + sc.serviceWorkerMap[clusterName] = true } } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 7baefaf67ab..c5ddb5b748a 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -67,6 +67,8 @@ const ( UserAgentName = "federation-service-controller" KubeAPIQPS = 20.0 KubeAPIBurst = 30 + + maxNoOfClusters = 100 ) type cachedService struct { @@ -118,6 +120,16 @@ type ServiceController struct { // services that need to be synced queue *workqueue.Type knownClusterSet sets.String + // endpoint worker map contains all the clusters registered with an indication that worker exist + // key clusterName + endpointWorkerMap map[string]bool + // channel for worker to signal that it is going out of existence + endpointWorkerDoneChan chan string + // service worker map contains all the clusters registered with an indication that worker exist + // key clusterName + serviceWorkerMap map[string]bool + // channel for worker to signal that it is going out of existence + serviceWorkerDoneChan chan string } // New returns a new service controller to keep DNS provider service resources @@ -205,6 +217,11 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte }, }, ) + + s.endpointWorkerMap = make(map[string]bool) + s.serviceWorkerMap = make(map[string]bool) + s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters) + s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters) return s }