Handle review comments for Fix goroutine leak in federation service controller

This commit is contained in:
shashidharatd 2016-09-23 14:27:51 +05:30
parent d8ff4870cb
commit 690a06b9b8
3 changed files with 40 additions and 39 deletions

View File

@ -32,43 +32,44 @@ import (
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterEndpointWorker() { func (sc *ServiceController) clusterEndpointWorker() {
// process all pending events in endpointWorkerDoneChan // process all pending events in endpointWorkerDoneChan
eventPending := true ForLoop:
for eventPending { for {
select { select {
case clusterName := <-sc.endpointWorkerDoneChan: case clusterName := <-sc.endpointWorkerDoneChan:
sc.endpointWorkerMap[clusterName] = false sc.endpointWorkerMap[clusterName] = false
default: default:
// non-blocking, comes here if all existing events are processed // non-blocking, comes here if all existing events are processed
eventPending = false break ForLoop
break
} }
} }
for clusterName, cache := range sc.clusterCache.clientMap { for clusterName, cache := range sc.clusterCache.clientMap {
workerExist, keyFound := sc.endpointWorkerMap[clusterName] workerExist, found := sc.endpointWorkerMap[clusterName]
if keyFound && workerExist { if found && workerExist {
continue continue
} }
sc.endpointWorkerMap[clusterName] = true
// create a worker only if the previous worker has finished and gone out of scope // create a worker only if the previous worker has finished and gone out of scope
go func(cache *clusterCache, clusterName string) { go func(cache *clusterCache, clusterName string) {
fedClient := sc.federationClient fedClient := sc.federationClient
for { for {
key, quit := cache.endpointQueue.Get() func() {
// update endpoint cache key, quit := cache.endpointQueue.Get()
if quit { // update endpoint cache
// send signal that current worker has finished tasks and is going out of scope if quit {
sc.endpointWorkerDoneChan <- clusterName // send signal that current worker has finished tasks and is going out of scope
return sc.endpointWorkerDoneChan <- clusterName
} return
defer cache.endpointQueue.Done(key) }
err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) defer cache.endpointQueue.Done(key)
if err != nil { err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
glog.V(2).Infof("Failed to sync endpoint: %+v", err) if err != nil {
} glog.V(2).Infof("Failed to sync endpoint: %+v", err)
}
}()
} }
}(cache, clusterName) }(cache, clusterName)
sc.endpointWorkerMap[clusterName] = true
} }
} }

View File

@ -36,43 +36,43 @@ import (
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterServiceWorker() { func (sc *ServiceController) clusterServiceWorker() {
// process all pending events in serviceWorkerDoneChan // process all pending events in serviceWorkerDoneChan
eventPending := true ForLoop:
for eventPending { for {
select { select {
case clusterName := <-sc.serviceWorkerDoneChan: case clusterName := <-sc.serviceWorkerDoneChan:
sc.serviceWorkerMap[clusterName] = false sc.serviceWorkerMap[clusterName] = false
default: default:
// non-blocking, comes here if all existing events are processed // non-blocking, comes here if all existing events are processed
eventPending = false break ForLoop
break
} }
} }
for clusterName, cache := range sc.clusterCache.clientMap { for clusterName, cache := range sc.clusterCache.clientMap {
workerExist, keyFound := sc.serviceWorkerMap[clusterName] workerExist, found := sc.serviceWorkerMap[clusterName]
if keyFound && workerExist { if found && workerExist {
continue continue
} }
sc.serviceWorkerMap[clusterName] = true
// create a worker only if the previous worker has finished and gone out of scope // create a worker only if the previous worker has finished and gone out of scope
go func(cache *clusterCache, clusterName string) { go func(cache *clusterCache, clusterName string) {
fedClient := sc.federationClient fedClient := sc.federationClient
for { for {
key, quit := cache.serviceQueue.Get() func() {
if quit { key, quit := cache.serviceQueue.Get()
// send signal that current worker has finished tasks and is going out of scope if quit {
sc.serviceWorkerDoneChan <- clusterName // send signal that current worker has finished tasks and is going out of scope
return sc.serviceWorkerDoneChan <- clusterName
} return
defer cache.serviceQueue.Done(key) }
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) defer cache.serviceQueue.Done(key)
if err != nil { err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
glog.Errorf("Failed to sync service: %+v", err) if err != nil {
} glog.Errorf("Failed to sync service: %+v", err)
}
}()
} }
}(cache, clusterName) }(cache, clusterName)
sc.serviceWorkerMap[clusterName] = true
} }
} }

View File

@ -68,7 +68,7 @@ const (
KubeAPIQPS = 20.0 KubeAPIQPS = 20.0
KubeAPIBurst = 30 KubeAPIBurst = 30
maxNoOfClusters = 256 maxNoOfClusters = 100
) )
type cachedService struct { type cachedService struct {