123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- package concurrent
-
- import (
- "context"
- "fmt"
- "runtime"
- "runtime/debug"
- "sync"
- "time"
- "reflect"
- )
-
- // HandlePanic logs goroutine panic by default
- var HandlePanic = func(recovered interface{}, funcName string) {
- ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
- ErrorLogger.Println(string(debug.Stack()))
- }
-
- // UnboundedExecutor is a executor without limits on counts of alive goroutines
- // it tracks the goroutine started by it, and can cancel them when shutdown
- type UnboundedExecutor struct {
- ctx context.Context
- cancel context.CancelFunc
- activeGoroutinesMutex *sync.Mutex
- activeGoroutines map[string]int
- HandlePanic func(recovered interface{}, funcName string)
- }
-
- // GlobalUnboundedExecutor has the life cycle of the program itself
- // any goroutine want to be shutdown before main exit can be started from this executor
- // GlobalUnboundedExecutor expects the main function to call stop
- // it does not magically knows the main function exits
- var GlobalUnboundedExecutor = NewUnboundedExecutor()
-
- // NewUnboundedExecutor creates a new UnboundedExecutor,
- // UnboundedExecutor can not be created by &UnboundedExecutor{}
- // HandlePanic can be set with a callback to override global HandlePanic
- func NewUnboundedExecutor() *UnboundedExecutor {
- ctx, cancel := context.WithCancel(context.TODO())
- return &UnboundedExecutor{
- ctx: ctx,
- cancel: cancel,
- activeGoroutinesMutex: &sync.Mutex{},
- activeGoroutines: map[string]int{},
- }
- }
-
- // Go starts a new goroutine and tracks its lifecycle.
- // Panic will be recovered and logged automatically, except for StopSignal
- func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
- pc := reflect.ValueOf(handler).Pointer()
- f := runtime.FuncForPC(pc)
- funcName := f.Name()
- file, line := f.FileLine(pc)
- executor.activeGoroutinesMutex.Lock()
- defer executor.activeGoroutinesMutex.Unlock()
- startFrom := fmt.Sprintf("%s:%d", file, line)
- executor.activeGoroutines[startFrom] += 1
- go func() {
- defer func() {
- recovered := recover()
- // if you want to quit a goroutine without trigger HandlePanic
- // use runtime.Goexit() to quit
- if recovered != nil {
- if executor.HandlePanic == nil {
- HandlePanic(recovered, funcName)
- } else {
- executor.HandlePanic(recovered, funcName)
- }
- }
- executor.activeGoroutinesMutex.Lock()
- executor.activeGoroutines[startFrom] -= 1
- executor.activeGoroutinesMutex.Unlock()
- }()
- handler(executor.ctx)
- }()
- }
-
- // Stop cancel all goroutines started by this executor without wait
- func (executor *UnboundedExecutor) Stop() {
- executor.cancel()
- }
-
- // StopAndWaitForever cancel all goroutines started by this executor and
- // wait until all goroutines exited
- func (executor *UnboundedExecutor) StopAndWaitForever() {
- executor.StopAndWait(context.Background())
- }
-
- // StopAndWait cancel all goroutines started by this executor and wait.
- // Wait can be cancelled by the context passed in.
- func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
- executor.cancel()
- for {
- oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
- select {
- case <-oneHundredMilliseconds.C:
- if executor.checkNoActiveGoroutines() {
- return
- }
- case <-ctx.Done():
- return
- }
- }
- }
-
- func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
- executor.activeGoroutinesMutex.Lock()
- defer executor.activeGoroutinesMutex.Unlock()
- for startFrom, count := range executor.activeGoroutines {
- if count > 0 {
- InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
- "startFrom", startFrom,
- "count", count)
- return false
- }
- }
- return true
- }
|