Fix goroutine leak in federation service controller

This commit is contained in:
shashidharatd
2016-09-21 19:11:00 +05:30
parent cba0c6fb16
commit d8ff4870cb
3 changed files with 83 additions and 25 deletions

View File

@@ -31,22 +31,42 @@ import (
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterEndpointWorker() { func (sc *ServiceController) clusterEndpointWorker() {
fedClient := sc.federationClient // process all pending events in endpointWorkerDoneChan
eventPending := true
for eventPending {
select {
case clusterName := <-sc.endpointWorkerDoneChan:
sc.endpointWorkerMap[clusterName] = false
default:
// non-blocking, comes here if all existing events are processed
eventPending = false
break
}
}
for clusterName, cache := range sc.clusterCache.clientMap { for clusterName, cache := range sc.clusterCache.clientMap {
workerExist, keyFound := sc.endpointWorkerMap[clusterName]
if keyFound && workerExist {
continue
}
sc.endpointWorkerMap[clusterName] = true
// create a worker only if the previous worker has finished and gone out of scope
go func(cache *clusterCache, clusterName string) { go func(cache *clusterCache, clusterName string) {
fedClient := sc.federationClient
for { for {
func() { key, quit := cache.endpointQueue.Get()
key, quit := cache.endpointQueue.Get() // update endpoint cache
// update endpoint cache if quit {
if quit { // send signal that current worker has finished tasks and is going out of scope
return sc.endpointWorkerDoneChan <- clusterName
} return
defer cache.endpointQueue.Done(key) }
err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) defer cache.endpointQueue.Done(key)
if err != nil { err := sc.clusterCache.syncEndpoint(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
glog.V(2).Infof("Failed to sync endpoint: %+v", err) if err != nil {
} glog.V(2).Infof("Failed to sync endpoint: %+v", err)
}() }
} }
}(cache, clusterName) }(cache, clusterName)
} }

View File

@@ -35,21 +35,42 @@ import (
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterServiceWorker() { func (sc *ServiceController) clusterServiceWorker() {
fedClient := sc.federationClient // process all pending events in serviceWorkerDoneChan
eventPending := true
for eventPending {
select {
case clusterName := <-sc.serviceWorkerDoneChan:
sc.serviceWorkerMap[clusterName] = false
default:
// non-blocking, comes here if all existing events are processed
eventPending = false
break
}
}
for clusterName, cache := range sc.clusterCache.clientMap { for clusterName, cache := range sc.clusterCache.clientMap {
workerExist, keyFound := sc.serviceWorkerMap[clusterName]
if keyFound && workerExist {
continue
}
sc.serviceWorkerMap[clusterName] = true
// create a worker only if the previous worker has finished and gone out of scope
go func(cache *clusterCache, clusterName string) { go func(cache *clusterCache, clusterName string) {
fedClient := sc.federationClient
for { for {
func() { key, quit := cache.serviceQueue.Get()
key, quit := cache.serviceQueue.Get() if quit {
defer cache.serviceQueue.Done(key) // send signal that current worker has finished tasks and is going out of scope
if quit { sc.serviceWorkerDoneChan <- clusterName
return return
} }
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc) defer cache.serviceQueue.Done(key)
if err != nil { err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
glog.Errorf("Failed to sync service: %+v", err) if err != nil {
} glog.Errorf("Failed to sync service: %+v", err)
}() }
} }
}(cache, clusterName) }(cache, clusterName)
} }

View File

@@ -67,6 +67,8 @@ const (
UserAgentName = "federation-service-controller" UserAgentName = "federation-service-controller"
KubeAPIQPS = 20.0 KubeAPIQPS = 20.0
KubeAPIBurst = 30 KubeAPIBurst = 30
maxNoOfClusters = 256
) )
type cachedService struct { type cachedService struct {
@@ -118,6 +120,16 @@ type ServiceController struct {
// services that need to be synced // services that need to be synced
queue *workqueue.Type queue *workqueue.Type
knownClusterSet sets.String knownClusterSet sets.String
// endpoint worker map contains all the clusters registered with an indication that worker exist
// key clusterName
endpointWorkerMap map[string]bool
// channel for worker to signal that it is going out of existence
endpointWorkerDoneChan chan string
// service worker map contains all the clusters registered with an indication that worker exist
// key clusterName
serviceWorkerMap map[string]bool
// channel for worker to signal that it is going out of existence
serviceWorkerDoneChan chan string
} }
// New returns a new service controller to keep DNS provider service resources // New returns a new service controller to keep DNS provider service resources
@@ -204,6 +216,11 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
}, },
}, },
) )
s.endpointWorkerMap = make(map[string]bool)
s.serviceWorkerMap = make(map[string]bool)
s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters)
s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters)
return s return s
} }