forked from github/multus-cni
gomodule is still in progress to migrate for now, hence multus team decide to keep vendor directory to support build without gomodule.
120 lines
3.6 KiB
Go
120 lines
3.6 KiB
Go
package concurrent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
"reflect"
|
|
)
|
|
|
|
// HandlePanic logs goroutine panic by default
|
|
var HandlePanic = func(recovered interface{}, funcName string) {
|
|
ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
|
|
ErrorLogger.Println(string(debug.Stack()))
|
|
}
|
|
|
|
// UnboundedExecutor is a executor without limits on counts of alive goroutines
|
|
// it tracks the goroutine started by it, and can cancel them when shutdown
|
|
type UnboundedExecutor struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
activeGoroutinesMutex *sync.Mutex
|
|
activeGoroutines map[string]int
|
|
HandlePanic func(recovered interface{}, funcName string)
|
|
}
|
|
|
|
// GlobalUnboundedExecutor has the life cycle of the program itself
|
|
// any goroutine want to be shutdown before main exit can be started from this executor
|
|
// GlobalUnboundedExecutor expects the main function to call stop
|
|
// it does not magically knows the main function exits
|
|
var GlobalUnboundedExecutor = NewUnboundedExecutor()
|
|
|
|
// NewUnboundedExecutor creates a new UnboundedExecutor,
|
|
// UnboundedExecutor can not be created by &UnboundedExecutor{}
|
|
// HandlePanic can be set with a callback to override global HandlePanic
|
|
func NewUnboundedExecutor() *UnboundedExecutor {
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
return &UnboundedExecutor{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
activeGoroutinesMutex: &sync.Mutex{},
|
|
activeGoroutines: map[string]int{},
|
|
}
|
|
}
|
|
|
|
// Go starts a new goroutine and tracks its lifecycle.
|
|
// Panic will be recovered and logged automatically, except for StopSignal
|
|
func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
|
|
pc := reflect.ValueOf(handler).Pointer()
|
|
f := runtime.FuncForPC(pc)
|
|
funcName := f.Name()
|
|
file, line := f.FileLine(pc)
|
|
executor.activeGoroutinesMutex.Lock()
|
|
defer executor.activeGoroutinesMutex.Unlock()
|
|
startFrom := fmt.Sprintf("%s:%d", file, line)
|
|
executor.activeGoroutines[startFrom] += 1
|
|
go func() {
|
|
defer func() {
|
|
recovered := recover()
|
|
// if you want to quit a goroutine without trigger HandlePanic
|
|
// use runtime.Goexit() to quit
|
|
if recovered != nil {
|
|
if executor.HandlePanic == nil {
|
|
HandlePanic(recovered, funcName)
|
|
} else {
|
|
executor.HandlePanic(recovered, funcName)
|
|
}
|
|
}
|
|
executor.activeGoroutinesMutex.Lock()
|
|
executor.activeGoroutines[startFrom] -= 1
|
|
executor.activeGoroutinesMutex.Unlock()
|
|
}()
|
|
handler(executor.ctx)
|
|
}()
|
|
}
|
|
|
|
// Stop cancel all goroutines started by this executor without wait
|
|
func (executor *UnboundedExecutor) Stop() {
|
|
executor.cancel()
|
|
}
|
|
|
|
// StopAndWaitForever cancel all goroutines started by this executor and
|
|
// wait until all goroutines exited
|
|
func (executor *UnboundedExecutor) StopAndWaitForever() {
|
|
executor.StopAndWait(context.Background())
|
|
}
|
|
|
|
// StopAndWait cancel all goroutines started by this executor and wait.
|
|
// Wait can be cancelled by the context passed in.
|
|
func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
|
|
executor.cancel()
|
|
for {
|
|
oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
|
|
select {
|
|
case <-oneHundredMilliseconds.C:
|
|
if executor.checkNoActiveGoroutines() {
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
|
|
executor.activeGoroutinesMutex.Lock()
|
|
defer executor.activeGoroutinesMutex.Unlock()
|
|
for startFrom, count := range executor.activeGoroutines {
|
|
if count > 0 {
|
|
InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
|
|
"startFrom", startFrom,
|
|
"count", count)
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|