aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/modern-go/concurrent/unbounded_executor.go
blob: 05a77dceb1e23e95882e18ed707fb682fbdeec44 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package concurrent

import (
	"context"
	"fmt"
	"runtime"
	"runtime/debug"
	"sync"
	"time"
	"reflect"
)

// HandlePanic logs goroutine panic by default
var HandlePanic = func(recovered interface{}, funcName string) {
	ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
	ErrorLogger.Println(string(debug.Stack()))
}

// UnboundedExecutor is a executor without limits on counts of alive goroutines
// it tracks the goroutine started by it, and can cancel them when shutdown
type UnboundedExecutor struct {
	ctx                   context.Context
	cancel                context.CancelFunc
	activeGoroutinesMutex *sync.Mutex
	activeGoroutines      map[string]int
	HandlePanic           func(recovered interface{}, funcName string)
}

// GlobalUnboundedExecutor has the life cycle of the program itself
// any goroutine want to be shutdown before main exit can be started from this executor
// GlobalUnboundedExecutor expects the main function to call stop
// it does not magically knows the main function exits
var GlobalUnboundedExecutor = NewUnboundedExecutor()

// NewUnboundedExecutor creates a new UnboundedExecutor,
// UnboundedExecutor can not be created by &UnboundedExecutor{}
// HandlePanic can be set with a callback to override global HandlePanic
func NewUnboundedExecutor() *UnboundedExecutor {
	ctx, cancel := context.WithCancel(context.TODO())
	return &UnboundedExecutor{
		ctx:                   ctx,
		cancel:                cancel,
		activeGoroutinesMutex: &sync.Mutex{},
		activeGoroutines:      map[string]int{},
	}
}

// Go starts a new goroutine and tracks its lifecycle.
// Panic will be recovered and logged automatically, except for StopSignal
func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
	pc := reflect.ValueOf(handler).Pointer()
	f := runtime.FuncForPC(pc)
	funcName := f.Name()
	file, line := f.FileLine(pc)
	executor.activeGoroutinesMutex.Lock()
	defer executor.activeGoroutinesMutex.Unlock()
	startFrom := fmt.Sprintf("%s:%d", file, line)
	executor.activeGoroutines[startFrom] += 1
	go func() {
		defer func() {
			recovered := recover()
			// if you want to quit a goroutine without trigger HandlePanic
			// use runtime.Goexit() to quit
			if recovered != nil {
				if executor.HandlePanic == nil {
					HandlePanic(recovered, funcName)
				} else {
					executor.HandlePanic(recovered, funcName)
				}
			}
			executor.activeGoroutinesMutex.Lock()
			executor.activeGoroutines[startFrom] -= 1
			executor.activeGoroutinesMutex.Unlock()
		}()
		handler(executor.ctx)
	}()
}

// Stop cancel all goroutines started by this executor without wait
func (executor *UnboundedExecutor) Stop() {
	executor.cancel()
}

// StopAndWaitForever cancel all goroutines started by this executor and
// wait until all goroutines exited
func (executor *UnboundedExecutor) StopAndWaitForever() {
	executor.StopAndWait(context.Background())
}

// StopAndWait cancel all goroutines started by this executor and wait.
// Wait can be cancelled by the context passed in.
func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
	executor.cancel()
	for {
		oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
		select {
		case <-oneHundredMilliseconds.C:
			if executor.checkNoActiveGoroutines() {
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
	executor.activeGoroutinesMutex.Lock()
	defer executor.activeGoroutinesMutex.Unlock()
	for startFrom, count := range executor.activeGoroutines {
		if count > 0 {
			InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
				"startFrom", startFrom,
				"count", count)
			return false
		}
	}
	return true
}