diff options
Diffstat (limited to 'vendor/github.com/modern-go/concurrent/unbounded_executor.go')
| -rw-r--r-- | vendor/github.com/modern-go/concurrent/unbounded_executor.go | 119 |
1 files changed, 0 insertions, 119 deletions
diff --git a/vendor/github.com/modern-go/concurrent/unbounded_executor.go b/vendor/github.com/modern-go/concurrent/unbounded_executor.go deleted file mode 100644 index 05a77dc..0000000 --- a/vendor/github.com/modern-go/concurrent/unbounded_executor.go +++ /dev/null | |||
| @@ -1,119 +0,0 @@ | |||
| 1 | package concurrent | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "context" | ||
| 5 | "fmt" | ||
| 6 | "runtime" | ||
| 7 | "runtime/debug" | ||
| 8 | "sync" | ||
| 9 | "time" | ||
| 10 | "reflect" | ||
| 11 | ) | ||
| 12 | |||
| 13 | // HandlePanic logs goroutine panic by default | ||
| 14 | var HandlePanic = func(recovered interface{}, funcName string) { | ||
| 15 | ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered)) | ||
| 16 | ErrorLogger.Println(string(debug.Stack())) | ||
| 17 | } | ||
| 18 | |||
| 19 | // UnboundedExecutor is a executor without limits on counts of alive goroutines | ||
| 20 | // it tracks the goroutine started by it, and can cancel them when shutdown | ||
| 21 | type UnboundedExecutor struct { | ||
| 22 | ctx context.Context | ||
| 23 | cancel context.CancelFunc | ||
| 24 | activeGoroutinesMutex *sync.Mutex | ||
| 25 | activeGoroutines map[string]int | ||
| 26 | HandlePanic func(recovered interface{}, funcName string) | ||
| 27 | } | ||
| 28 | |||
| 29 | // GlobalUnboundedExecutor has the life cycle of the program itself | ||
| 30 | // any goroutine want to be shutdown before main exit can be started from this executor | ||
| 31 | // GlobalUnboundedExecutor expects the main function to call stop | ||
| 32 | // it does not magically knows the main function exits | ||
| 33 | var GlobalUnboundedExecutor = NewUnboundedExecutor() | ||
| 34 | |||
| 35 | // NewUnboundedExecutor creates a new UnboundedExecutor, | ||
| 36 | // UnboundedExecutor can not be created by &UnboundedExecutor{} | ||
| 37 | // HandlePanic can be set with a callback to override global HandlePanic | ||
| 38 | func NewUnboundedExecutor() *UnboundedExecutor { | ||
| 39 | ctx, cancel := context.WithCancel(context.TODO()) | ||
| 40 | return &UnboundedExecutor{ | ||
| 41 | ctx: ctx, | ||
| 42 | cancel: cancel, | ||
| 43 | activeGoroutinesMutex: &sync.Mutex{}, | ||
| 44 | activeGoroutines: map[string]int{}, | ||
| 45 | } | ||
| 46 | } | ||
| 47 | |||
| 48 | // Go starts a new goroutine and tracks its lifecycle. | ||
| 49 | // Panic will be recovered and logged automatically, except for StopSignal | ||
| 50 | func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) { | ||
| 51 | pc := reflect.ValueOf(handler).Pointer() | ||
| 52 | f := runtime.FuncForPC(pc) | ||
| 53 | funcName := f.Name() | ||
| 54 | file, line := f.FileLine(pc) | ||
| 55 | executor.activeGoroutinesMutex.Lock() | ||
| 56 | defer executor.activeGoroutinesMutex.Unlock() | ||
| 57 | startFrom := fmt.Sprintf("%s:%d", file, line) | ||
| 58 | executor.activeGoroutines[startFrom] += 1 | ||
| 59 | go func() { | ||
| 60 | defer func() { | ||
| 61 | recovered := recover() | ||
| 62 | // if you want to quit a goroutine without trigger HandlePanic | ||
| 63 | // use runtime.Goexit() to quit | ||
| 64 | if recovered != nil { | ||
| 65 | if executor.HandlePanic == nil { | ||
| 66 | HandlePanic(recovered, funcName) | ||
| 67 | } else { | ||
| 68 | executor.HandlePanic(recovered, funcName) | ||
| 69 | } | ||
| 70 | } | ||
| 71 | executor.activeGoroutinesMutex.Lock() | ||
| 72 | executor.activeGoroutines[startFrom] -= 1 | ||
| 73 | executor.activeGoroutinesMutex.Unlock() | ||
| 74 | }() | ||
| 75 | handler(executor.ctx) | ||
| 76 | }() | ||
| 77 | } | ||
| 78 | |||
| 79 | // Stop cancel all goroutines started by this executor without wait | ||
| 80 | func (executor *UnboundedExecutor) Stop() { | ||
| 81 | executor.cancel() | ||
| 82 | } | ||
| 83 | |||
| 84 | // StopAndWaitForever cancel all goroutines started by this executor and | ||
| 85 | // wait until all goroutines exited | ||
| 86 | func (executor *UnboundedExecutor) StopAndWaitForever() { | ||
| 87 | executor.StopAndWait(context.Background()) | ||
| 88 | } | ||
| 89 | |||
| 90 | // StopAndWait cancel all goroutines started by this executor and wait. | ||
| 91 | // Wait can be cancelled by the context passed in. | ||
| 92 | func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) { | ||
| 93 | executor.cancel() | ||
| 94 | for { | ||
| 95 | oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100) | ||
| 96 | select { | ||
| 97 | case <-oneHundredMilliseconds.C: | ||
| 98 | if executor.checkNoActiveGoroutines() { | ||
| 99 | return | ||
| 100 | } | ||
| 101 | case <-ctx.Done(): | ||
| 102 | return | ||
| 103 | } | ||
| 104 | } | ||
| 105 | } | ||
| 106 | |||
| 107 | func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool { | ||
| 108 | executor.activeGoroutinesMutex.Lock() | ||
| 109 | defer executor.activeGoroutinesMutex.Unlock() | ||
| 110 | for startFrom, count := range executor.activeGoroutines { | ||
| 111 | if count > 0 { | ||
| 112 | InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit", | ||
| 113 | "startFrom", startFrom, | ||
| 114 | "count", count) | ||
| 115 | return false | ||
| 116 | } | ||
| 117 | } | ||
| 118 | return true | ||
| 119 | } | ||