-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
69 lines (62 loc) · 1.21 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package factory
import (
"fmt"
"sync/atomic"
"time"
)
type exitSignal struct{}
// worker 工作者角色
type worker struct {
isBusy int32
action func(interface{})
params chan interface{}
}
func (w *worker) process() (quit bool) {
defer handlePanic(func(i interface{}) {
fmt.Printf("worker broken, panic: %v, call stack: \n%s", i, stackTrace(0))
})
for params := range w.params {
if _, ok := params.(exitSignal); ok {
return true
}
w.action(params)
atomic.StoreInt32(&w.isBusy, 0)
}
return false
}
func (w *worker) assign(line *Line, params interface{}) bool {
if atomic.CompareAndSwapInt32(&w.isBusy, 0, 1) {
line.waitGroup.Add(1)
w.action = line.action
w.params <- params
return true
}
return false
}
func (w *worker) shutdown() {
w.params <- exitSignal{}
}
func newWorker() (w *worker) {
w = &worker{
params: make(chan interface{}),
}
go func(w *worker) {
for {
if w.process() {
break
}
atomic.StoreInt32(&w.isBusy, 0)
}
// 置为繁忙状态
atomic.StoreInt32(&w.isBusy, 1)
// 关闭任务通道
defer close(w.params)
// 可能存在任务
select {
case params := <-w.params:
w.action(params)
case <-time.Tick(time.Millisecond):
}
}(w)
return
}