diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 07acd88d985..1c86111615c 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -32,43 +32,44 @@ import ( // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterEndpointWorker() { // process all pending events in endpointWorkerDoneChan - eventPending := true - for eventPending { +ForLoop: + for { select { case clusterName := <-sc.endpointWorkerDoneChan: sc.endpointWorkerMap[clusterName] = false default: // non-blocking, comes here if all existing events are processed - eventPending = false - break + break ForLoop } } for clusterName, cache := range sc.clusterCache.clientMap { - workerExist, keyFound := sc.endpointWorkerMap[clusterName] - if keyFound && workerExist { + workerExist, found := sc.endpointWorkerMap[clusterName] + if found && workerExist { continue } - sc.endpointWorkerMap[clusterName] = true // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { fedClient := sc.federationClient for { - key, quit := cache.endpointQueue.Get() - // update endpoint cache - if quit { - // send signal that current worker has finished tasks and is going out of scope - sc.endpointWorkerDoneChan <- clusterName - return - } - defer cache.endpointQueue.Done(key) - err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.V(2).Infof("Failed to sync endpoint: %+v", err) - } + func() { + key, quit := cache.endpointQueue.Get() + // update endpoint cache + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.endpointWorkerDoneChan <- clusterName + return + } + defer cache.endpointQueue.Done(key) + err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.V(2).Infof("Failed to sync endpoint: %+v", err) + } + }() } }(cache, clusterName) + sc.endpointWorkerMap[clusterName] = true } } diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 8e59caa2ebc..3bf47e83ea5 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -36,43 +36,43 @@ import ( // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterServiceWorker() { // process all pending events in serviceWorkerDoneChan - eventPending := true - for eventPending { +ForLoop: + for { select { case clusterName := <-sc.serviceWorkerDoneChan: sc.serviceWorkerMap[clusterName] = false default: // non-blocking, comes here if all existing events are processed - eventPending = false - break + break ForLoop } } for clusterName, cache := range sc.clusterCache.clientMap { - workerExist, keyFound := sc.serviceWorkerMap[clusterName] - if keyFound && workerExist { + workerExist, found := sc.serviceWorkerMap[clusterName] + if found && workerExist { continue } - sc.serviceWorkerMap[clusterName] = true // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { fedClient := sc.federationClient for { - key, quit := cache.serviceQueue.Get() - if quit { - // send signal that current worker has finished tasks and is going out of scope - sc.serviceWorkerDoneChan <- clusterName - return - } - defer cache.serviceQueue.Done(key) - err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.Errorf("Failed to sync service: %+v", err) - } - + func() { + key, quit := cache.serviceQueue.Get() + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.serviceWorkerDoneChan <- clusterName + return + } + defer cache.serviceQueue.Done(key) + err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.Errorf("Failed to sync service: %+v", err) + } + }() } }(cache, clusterName) + sc.serviceWorkerMap[clusterName] = true } } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 376aaee9cbe..291c9b80741 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -68,7 +68,7 @@ const ( KubeAPIQPS = 20.0 KubeAPIBurst = 30 - maxNoOfClusters = 256 + maxNoOfClusters = 100 ) type cachedService struct {