From d8ff4870cb5d55973e6fb5ee8c081cae30c83478 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Wed, 21 Sep 2016 19:11:00 +0530 Subject: [PATCH] Fix goroutine leak in federation service controller --- .../service/endpoint_helper.go | 46 +++++++++++++------ .../service/service_helper.go | 45 +++++++++++++----- .../service/servicecontroller.go | 17 +++++++ 3 files changed, 83 insertions(+), 25 deletions(-) diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 4202f190e70..07acd88d985 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -31,22 +31,42 @@ import ( // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterEndpointWorker() { - fedClient := sc.federationClient + // process all pending events in endpointWorkerDoneChan + eventPending := true + for eventPending { + select { + case clusterName := <-sc.endpointWorkerDoneChan: + sc.endpointWorkerMap[clusterName] = false + default: + // non-blocking, comes here if all existing events are processed + eventPending = false + break + } + } + for clusterName, cache := range sc.clusterCache.clientMap { + workerExist, keyFound := sc.endpointWorkerMap[clusterName] + if keyFound && workerExist { + continue + } + sc.endpointWorkerMap[clusterName] = true + + // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { + fedClient := sc.federationClient for { - func() { - key, quit := cache.endpointQueue.Get() - // update endpoint cache - if quit { - return - } - defer cache.endpointQueue.Done(key) - err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.V(2).Infof("Failed to sync endpoint: %+v", err) - } - }() + key, quit := cache.endpointQueue.Get() + // update endpoint cache + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.endpointWorkerDoneChan <- clusterName + return + } + defer cache.endpointQueue.Done(key) + err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.V(2).Infof("Failed to sync endpoint: %+v", err) + } } }(cache, clusterName) } diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 7ccd844b980..8e59caa2ebc 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -35,21 +35,42 @@ import ( // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterServiceWorker() { - fedClient := sc.federationClient + // process all pending events in serviceWorkerDoneChan + eventPending := true + for eventPending { + select { + case clusterName := <-sc.serviceWorkerDoneChan: + sc.serviceWorkerMap[clusterName] = false + default: + // non-blocking, comes here if all existing events are processed + eventPending = false + break + } + } + for clusterName, cache := range sc.clusterCache.clientMap { + workerExist, keyFound := sc.serviceWorkerMap[clusterName] + if keyFound && workerExist { + continue + } + sc.serviceWorkerMap[clusterName] = true + + // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { + fedClient := sc.federationClient for { - func() { - key, quit := cache.serviceQueue.Get() - defer cache.serviceQueue.Done(key) - if quit { - return - } - err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.Errorf("Failed to sync service: %+v", err) - } - }() + key, quit := cache.serviceQueue.Get() + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.serviceWorkerDoneChan <- clusterName + return + } + defer cache.serviceQueue.Done(key) + err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.Errorf("Failed to sync service: %+v", err) + } + } }(cache, clusterName) } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index cb4757467e3..376aaee9cbe 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -67,6 +67,8 @@ const ( UserAgentName = "federation-service-controller" KubeAPIQPS = 20.0 KubeAPIBurst = 30 + + maxNoOfClusters = 256 ) type cachedService struct { @@ -118,6 +120,16 @@ type ServiceController struct { // services that need to be synced queue *workqueue.Type knownClusterSet sets.String + // endpoint worker map contains all the clusters registered with an indication that worker exist + // key clusterName + endpointWorkerMap map[string]bool + // channel for worker to signal that it is going out of existence + endpointWorkerDoneChan chan string + // service worker map contains all the clusters registered with an indication that worker exist + // key clusterName + serviceWorkerMap map[string]bool + // channel for worker to signal that it is going out of existence + serviceWorkerDoneChan chan string } // New returns a new service controller to keep DNS provider service resources @@ -204,6 +216,11 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte }, }, ) + + s.endpointWorkerMap = make(map[string]bool) + s.serviceWorkerMap = make(map[string]bool) + s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters) + s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters) return s }