From d8ff4870cb5d55973e6fb5ee8c081cae30c83478 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Wed, 21 Sep 2016 19:11:00 +0530 Subject: [PATCH 1/2] Fix goroutine leak in federation service controller --- .../service/endpoint_helper.go | 46 +++++++++++++------ .../service/service_helper.go | 45 +++++++++++++----- .../service/servicecontroller.go | 17 +++++++ 3 files changed, 83 insertions(+), 25 deletions(-) diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 4202f190e70..07acd88d985 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -31,22 +31,42 @@ import ( // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterEndpointWorker() { - fedClient := sc.federationClient + // process all pending events in endpointWorkerDoneChan + eventPending := true + for eventPending { + select { + case clusterName := <-sc.endpointWorkerDoneChan: + sc.endpointWorkerMap[clusterName] = false + default: + // non-blocking, comes here if all existing events are processed + eventPending = false + break + } + } + for clusterName, cache := range sc.clusterCache.clientMap { + workerExist, keyFound := sc.endpointWorkerMap[clusterName] + if keyFound && workerExist { + continue + } + sc.endpointWorkerMap[clusterName] = true + + // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { + fedClient := sc.federationClient for { - func() { - key, quit := cache.endpointQueue.Get() - // update endpoint cache - if quit { - return - } - defer cache.endpointQueue.Done(key) - err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.V(2).Infof("Failed to sync endpoint: %+v", err) - } - }() + key, quit := cache.endpointQueue.Get() + // update endpoint cache + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.endpointWorkerDoneChan <- clusterName + return + } + defer cache.endpointQueue.Done(key) + err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.V(2).Infof("Failed to sync endpoint: %+v", err) + } } }(cache, clusterName) } diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 7ccd844b980..8e59caa2ebc 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -35,21 +35,42 @@ import ( // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterServiceWorker() { - fedClient := sc.federationClient + // process all pending events in serviceWorkerDoneChan + eventPending := true + for eventPending { + select { + case clusterName := <-sc.serviceWorkerDoneChan: + sc.serviceWorkerMap[clusterName] = false + default: + // non-blocking, comes here if all existing events are processed + eventPending = false + break + } + } + for clusterName, cache := range sc.clusterCache.clientMap { + workerExist, keyFound := sc.serviceWorkerMap[clusterName] + if keyFound && workerExist { + continue + } + sc.serviceWorkerMap[clusterName] = true + + // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { + fedClient := sc.federationClient for { - func() { - key, quit := cache.serviceQueue.Get() - defer cache.serviceQueue.Done(key) - if quit { - return - } - err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.Errorf("Failed to sync service: %+v", err) - } - }() + key, quit := cache.serviceQueue.Get() + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.serviceWorkerDoneChan <- clusterName + return + } + defer cache.serviceQueue.Done(key) + err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.Errorf("Failed to sync service: %+v", err) + } + } }(cache, clusterName) } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index cb4757467e3..376aaee9cbe 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -67,6 +67,8 @@ const ( UserAgentName = "federation-service-controller" KubeAPIQPS = 20.0 KubeAPIBurst = 30 + + maxNoOfClusters = 256 ) type cachedService struct { @@ -118,6 +120,16 @@ type ServiceController struct { // services that need to be synced queue *workqueue.Type knownClusterSet sets.String + // endpoint worker map contains all the clusters registered with an indication that worker exist + // key clusterName + endpointWorkerMap map[string]bool + // channel for worker to signal that it is going out of existence + endpointWorkerDoneChan chan string + // service worker map contains all the clusters registered with an indication that worker exist + // key clusterName + serviceWorkerMap map[string]bool + // channel for worker to signal that it is going out of existence + serviceWorkerDoneChan chan string } // New returns a new service controller to keep DNS provider service resources @@ -204,6 +216,11 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte }, }, ) + + s.endpointWorkerMap = make(map[string]bool) + s.serviceWorkerMap = make(map[string]bool) + s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters) + s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters) return s } From 690a06b9b8f7c0c76e5b68ff273a5f9443f7849e Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Fri, 23 Sep 2016 14:27:51 +0530 Subject: [PATCH 2/2] Handle review comments for Fix goroutine leak in federation service controller --- .../service/endpoint_helper.go | 39 ++++++++++--------- .../service/service_helper.go | 38 +++++++++--------- .../service/servicecontroller.go | 2 +- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/federation/pkg/federation-controller/service/endpoint_helper.go b/federation/pkg/federation-controller/service/endpoint_helper.go index 07acd88d985..1c86111615c 100644 --- a/federation/pkg/federation-controller/service/endpoint_helper.go +++ b/federation/pkg/federation-controller/service/endpoint_helper.go @@ -32,43 +32,44 @@ import ( // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterEndpointWorker() { // process all pending events in endpointWorkerDoneChan - eventPending := true - for eventPending { +ForLoop: + for { select { case clusterName := <-sc.endpointWorkerDoneChan: sc.endpointWorkerMap[clusterName] = false default: // non-blocking, comes here if all existing events are processed - eventPending = false - break + break ForLoop } } for clusterName, cache := range sc.clusterCache.clientMap { - workerExist, keyFound := sc.endpointWorkerMap[clusterName] - if keyFound && workerExist { + workerExist, found := sc.endpointWorkerMap[clusterName] + if found && workerExist { continue } - sc.endpointWorkerMap[clusterName] = true // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { fedClient := sc.federationClient for { - key, quit := cache.endpointQueue.Get() - // update endpoint cache - if quit { - // send signal that current worker has finished tasks and is going out of scope - sc.endpointWorkerDoneChan <- clusterName - return - } - defer cache.endpointQueue.Done(key) - err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.V(2).Infof("Failed to sync endpoint: %+v", err) - } + func() { + key, quit := cache.endpointQueue.Get() + // update endpoint cache + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.endpointWorkerDoneChan <- clusterName + return + } + defer cache.endpointQueue.Done(key) + err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.V(2).Infof("Failed to sync endpoint: %+v", err) + } + }() } }(cache, clusterName) + sc.endpointWorkerMap[clusterName] = true } } diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 8e59caa2ebc..3bf47e83ea5 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -36,43 +36,43 @@ import ( // It enforces that the syncHandler is never invoked concurrently with the same key. func (sc *ServiceController) clusterServiceWorker() { // process all pending events in serviceWorkerDoneChan - eventPending := true - for eventPending { +ForLoop: + for { select { case clusterName := <-sc.serviceWorkerDoneChan: sc.serviceWorkerMap[clusterName] = false default: // non-blocking, comes here if all existing events are processed - eventPending = false - break + break ForLoop } } for clusterName, cache := range sc.clusterCache.clientMap { - workerExist, keyFound := sc.serviceWorkerMap[clusterName] - if keyFound && workerExist { + workerExist, found := sc.serviceWorkerMap[clusterName] + if found && workerExist { continue } - sc.serviceWorkerMap[clusterName] = true // create a worker only if the previous worker has finished and gone out of scope go func(cache *clusterCache, clusterName string) { fedClient := sc.federationClient for { - key, quit := cache.serviceQueue.Get() - if quit { - // send signal that current worker has finished tasks and is going out of scope - sc.serviceWorkerDoneChan <- clusterName - return - } - defer cache.serviceQueue.Done(key) - err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) - if err != nil { - glog.Errorf("Failed to sync service: %+v", err) - } - + func() { + key, quit := cache.serviceQueue.Get() + if quit { + // send signal that current worker has finished tasks and is going out of scope + sc.serviceWorkerDoneChan <- clusterName + return + } + defer cache.serviceQueue.Done(key) + err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) + if err != nil { + glog.Errorf("Failed to sync service: %+v", err) + } + }() } }(cache, clusterName) + sc.serviceWorkerMap[clusterName] = true } } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 376aaee9cbe..291c9b80741 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -68,7 +68,7 @@ const ( KubeAPIQPS = 20.0 KubeAPIBurst = 30 - maxNoOfClusters = 256 + maxNoOfClusters = 100 ) type cachedService struct {