-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_queue.go
156 lines (137 loc) · 3.01 KB
/
message_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package wkrpc
import (
"sync"
"github.com/WuKongIM/wklog"
"github.com/WuKongIM/wkrpc/proto"
"github.com/panjf2000/gnet/v2"
"go.uber.org/zap"
)
type message struct {
msgType proto.MsgType
data []byte
conn gnet.Conn
}
func (m *message) size() int {
return len(m.data)
}
type messageQueue struct {
ch chan struct{}
rl *RateLimiter // 限制字节流量速度
lazyFreeCycle uint64 // 懒惰释放周期,n表示n次释放一次
size uint64
left []*message // 左边队列
right []*message // 右边队列, 左右的目的是为了重复利用内存
nodrop []*message // 不能drop的消息
mu sync.Mutex
leftInWrite bool // 写入时是否使用左边队列
idx uint64 // 当前写入的位置下标
oldIdx uint64
cycle uint64
wklog.Log
}
func newMessageQueue(size uint64, ch bool,
lazyFreeCycle uint64, maxMemorySize uint64) *messageQueue {
q := &messageQueue{
rl: NewRateLimiter(maxMemorySize),
size: size,
lazyFreeCycle: lazyFreeCycle,
left: make([]*message, size),
right: make([]*message, size),
nodrop: make([]*message, 0),
Log: wklog.NewWKLog("messageQueue"),
}
if ch {
q.ch = make(chan struct{}, 1)
}
return q
}
func (q *messageQueue) add(msg *message) bool {
q.mu.Lock()
defer q.mu.Unlock()
if q.idx >= q.size {
return false
}
if !q.tryAdd(msg) {
return false
}
w := q.targetQueue()
w[q.idx] = msg
q.idx++
return true
}
// 必须要添加的消息不接受drop
func (q *messageQueue) mustAdd(msg *message) {
q.mu.Lock()
defer q.mu.Unlock()
q.nodrop = append(q.nodrop, msg)
}
func (q *messageQueue) get() []*message {
q.mu.Lock()
defer q.mu.Unlock()
q.cycle++
sz := q.idx
q.idx = 0
t := q.targetQueue()
q.leftInWrite = !q.leftInWrite
q.gc()
q.oldIdx = sz
if q.rl.Enabled() {
q.rl.Set(0)
}
if len(q.nodrop) == 0 {
return t[:sz]
}
var result []*message
if len(q.nodrop) > 0 {
ssm := q.nodrop
q.nodrop = make([]*message, 0)
result = append(result, ssm...)
}
return append(result, t[:sz]...)
}
func (q *messageQueue) targetQueue() []*message {
var t []*message
if q.leftInWrite {
t = q.left
} else {
t = q.right
}
return t
}
func (q *messageQueue) tryAdd(msg *message) bool {
if !q.rl.Enabled() {
return true
}
if q.rl.RateLimited() {
q.Warn("rate limited dropped", zap.Uint8("msgType", msg.msgType.Uint8()))
return false
}
q.rl.Increase(uint64(msg.size()))
return true
}
func (q *messageQueue) gc() {
if q.lazyFreeCycle > 0 {
oldq := q.targetQueue()
if q.lazyFreeCycle == 1 {
for i := uint64(0); i < q.oldIdx; i++ {
oldq[i].data = nil
}
} else if q.cycle%q.lazyFreeCycle == 0 {
for i := uint64(0); i < q.size; i++ {
oldq[i].data = nil
}
}
}
}
func (q *messageQueue) notify() {
if q.ch != nil {
select {
case q.ch <- struct{}{}:
default:
}
}
}
// Ch returns the notification channel.
func (q *messageQueue) notifyCh() <-chan struct{} {
return q.ch
}