diff options
Diffstat (limited to 'vendor/github.com/modern-go/concurrent/unbounded_executor.go')
-rw-r--r-- | vendor/github.com/modern-go/concurrent/unbounded_executor.go | 119 |
1 files changed, 0 insertions, 119 deletions
diff --git a/vendor/github.com/modern-go/concurrent/unbounded_executor.go b/vendor/github.com/modern-go/concurrent/unbounded_executor.go deleted file mode 100644 index 05a77dc..0000000 --- a/vendor/github.com/modern-go/concurrent/unbounded_executor.go +++ /dev/null | |||
@@ -1,119 +0,0 @@ | |||
1 | package concurrent | ||
2 | |||
3 | import ( | ||
4 | "context" | ||
5 | "fmt" | ||
6 | "runtime" | ||
7 | "runtime/debug" | ||
8 | "sync" | ||
9 | "time" | ||
10 | "reflect" | ||
11 | ) | ||
12 | |||
13 | // HandlePanic logs goroutine panic by default | ||
14 | var HandlePanic = func(recovered interface{}, funcName string) { | ||
15 | ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered)) | ||
16 | ErrorLogger.Println(string(debug.Stack())) | ||
17 | } | ||
18 | |||
19 | // UnboundedExecutor is a executor without limits on counts of alive goroutines | ||
20 | // it tracks the goroutine started by it, and can cancel them when shutdown | ||
21 | type UnboundedExecutor struct { | ||
22 | ctx context.Context | ||
23 | cancel context.CancelFunc | ||
24 | activeGoroutinesMutex *sync.Mutex | ||
25 | activeGoroutines map[string]int | ||
26 | HandlePanic func(recovered interface{}, funcName string) | ||
27 | } | ||
28 | |||
29 | // GlobalUnboundedExecutor has the life cycle of the program itself | ||
30 | // any goroutine want to be shutdown before main exit can be started from this executor | ||
31 | // GlobalUnboundedExecutor expects the main function to call stop | ||
32 | // it does not magically knows the main function exits | ||
33 | var GlobalUnboundedExecutor = NewUnboundedExecutor() | ||
34 | |||
35 | // NewUnboundedExecutor creates a new UnboundedExecutor, | ||
36 | // UnboundedExecutor can not be created by &UnboundedExecutor{} | ||
37 | // HandlePanic can be set with a callback to override global HandlePanic | ||
38 | func NewUnboundedExecutor() *UnboundedExecutor { | ||
39 | ctx, cancel := context.WithCancel(context.TODO()) | ||
40 | return &UnboundedExecutor{ | ||
41 | ctx: ctx, | ||
42 | cancel: cancel, | ||
43 | activeGoroutinesMutex: &sync.Mutex{}, | ||
44 | activeGoroutines: map[string]int{}, | ||
45 | } | ||
46 | } | ||
47 | |||
48 | // Go starts a new goroutine and tracks its lifecycle. | ||
49 | // Panic will be recovered and logged automatically, except for StopSignal | ||
50 | func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) { | ||
51 | pc := reflect.ValueOf(handler).Pointer() | ||
52 | f := runtime.FuncForPC(pc) | ||
53 | funcName := f.Name() | ||
54 | file, line := f.FileLine(pc) | ||
55 | executor.activeGoroutinesMutex.Lock() | ||
56 | defer executor.activeGoroutinesMutex.Unlock() | ||
57 | startFrom := fmt.Sprintf("%s:%d", file, line) | ||
58 | executor.activeGoroutines[startFrom] += 1 | ||
59 | go func() { | ||
60 | defer func() { | ||
61 | recovered := recover() | ||
62 | // if you want to quit a goroutine without trigger HandlePanic | ||
63 | // use runtime.Goexit() to quit | ||
64 | if recovered != nil { | ||
65 | if executor.HandlePanic == nil { | ||
66 | HandlePanic(recovered, funcName) | ||
67 | } else { | ||
68 | executor.HandlePanic(recovered, funcName) | ||
69 | } | ||
70 | } | ||
71 | executor.activeGoroutinesMutex.Lock() | ||
72 | executor.activeGoroutines[startFrom] -= 1 | ||
73 | executor.activeGoroutinesMutex.Unlock() | ||
74 | }() | ||
75 | handler(executor.ctx) | ||
76 | }() | ||
77 | } | ||
78 | |||
79 | // Stop cancel all goroutines started by this executor without wait | ||
80 | func (executor *UnboundedExecutor) Stop() { | ||
81 | executor.cancel() | ||
82 | } | ||
83 | |||
84 | // StopAndWaitForever cancel all goroutines started by this executor and | ||
85 | // wait until all goroutines exited | ||
86 | func (executor *UnboundedExecutor) StopAndWaitForever() { | ||
87 | executor.StopAndWait(context.Background()) | ||
88 | } | ||
89 | |||
90 | // StopAndWait cancel all goroutines started by this executor and wait. | ||
91 | // Wait can be cancelled by the context passed in. | ||
92 | func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) { | ||
93 | executor.cancel() | ||
94 | for { | ||
95 | oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100) | ||
96 | select { | ||
97 | case <-oneHundredMilliseconds.C: | ||
98 | if executor.checkNoActiveGoroutines() { | ||
99 | return | ||
100 | } | ||
101 | case <-ctx.Done(): | ||
102 | return | ||
103 | } | ||
104 | } | ||
105 | } | ||
106 | |||
107 | func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool { | ||
108 | executor.activeGoroutinesMutex.Lock() | ||
109 | defer executor.activeGoroutinesMutex.Unlock() | ||
110 | for startFrom, count := range executor.activeGoroutines { | ||
111 | if count > 0 { | ||
112 | InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit", | ||
113 | "startFrom", startFrom, | ||
114 | "count", count) | ||
115 | return false | ||
116 | } | ||
117 | } | ||
118 | return true | ||
119 | } | ||