-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy paththrottler.go
170 lines (149 loc) · 4.89 KB
/
throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package flow
import (
"fmt"
"sync/atomic"
"time"
"github.com/reugn/go-streams"
)
// ThrottleMode defines the behavior of the Throttler when its internal buffer is full.
type ThrottleMode int8
const (
// Backpressure instructs the Throttler to block upstream ingestion when its internal
// buffer is full. This effectively slows down the producer, preventing data loss
// and ensuring all elements are eventually processed, albeit at a reduced rate. This
// mode can cause upstream operations to block indefinitely if the downstream consumer
// cannot keep up.
Backpressure ThrottleMode = iota
// Discard instructs the Throttler to drop incoming elements when its internal buffer
// is full. This mode prioritizes maintaining the target throughput rate, even at the
// cost of data loss. Elements are silently dropped without any indication to the
// upstream producer. Use this mode when data loss is acceptable.
Discard
)
// Throttler limits the throughput to a specific number of elements per time unit.
type Throttler struct {
period time.Duration
mode ThrottleMode
maxElements int64
counter atomic.Int64
in chan any
out chan any
quotaSignal chan struct{}
done chan struct{}
}
// Verify Throttler satisfies the Flow interface.
var _ streams.Flow = (*Throttler)(nil)
// NewThrottler returns a new Throttler operator.
//
// The Throttler operator limits the rate at which elements are produced. It allows a
// maximum of 'elements' number of elements to be processed within a specified 'period'
// of time.
//
// elements is the maximum number of elements to be produced per the given period of time.
// bufferSize is the size of the internal buffer for incoming elements. This buffer
// temporarily holds elements waiting to be processed.
// mode specifies the processing behavior when the internal elements buffer is full.
// See [ThrottleMode] for available options.
//
// If elements or bufferSize are not positive, or if mode is not a supported
// ThrottleMode, NewThrottler will panic.
func NewThrottler(elements int, period time.Duration, bufferSize int, mode ThrottleMode) *Throttler {
if elements < 1 {
panic(fmt.Sprintf("nonpositive elements number: %d", elements))
}
if bufferSize < 1 {
panic(fmt.Sprintf("nonpositive buffer size: %d", bufferSize))
}
if mode != Discard && mode != Backpressure {
panic(fmt.Sprintf("unsupported ThrottleMode: %d", mode))
}
throttler := &Throttler{
maxElements: int64(elements),
period: period,
mode: mode,
in: make(chan any),
out: make(chan any, bufferSize),
quotaSignal: make(chan struct{}),
done: make(chan struct{}),
}
go throttler.resetQuotaCounterLoop()
go throttler.buffer()
return throttler
}
// quotaExceeded checks whether the quota per time unit has been exceeded.
func (th *Throttler) quotaExceeded() bool {
return th.counter.Load() >= th.maxElements
}
// resetQuotaCounterLoop resets the throttler quota counter every th.period
// and notifies the downstream processing goroutine of the quota reset.
func (th *Throttler) resetQuotaCounterLoop() {
ticker := time.NewTicker(th.period)
defer ticker.Stop()
for {
select {
case <-ticker.C:
th.counter.Store(0)
th.notifyQuotaReset() // send quota reset
case <-th.done:
return
}
}
}
// notifyQuotaReset notifies the downstream processor with quota reset.
func (th *Throttler) notifyQuotaReset() {
select {
case th.quotaSignal <- struct{}{}:
default:
}
}
// buffer buffers incoming elements from the in channel by sending them
// to the out channel, adhering to the configured ThrottleMode.
func (th *Throttler) buffer() {
switch th.mode {
case Discard:
for element := range th.in {
select {
case th.out <- element:
default:
}
}
case Backpressure:
for element := range th.in {
th.out <- element
}
}
close(th.out)
}
// Via asynchronously streams data to the given Flow and returns it.
func (th *Throttler) Via(flow streams.Flow) streams.Flow {
go th.streamPortioned(flow)
return flow
}
// To streams data to the given Sink and blocks until the Sink has completed
// processing all data.
func (th *Throttler) To(sink streams.Sink) {
th.streamPortioned(sink)
sink.AwaitCompletion()
}
// Out returns the output channel of the Throttler operator.
func (th *Throttler) Out() <-chan any {
return th.out
}
// In returns the input channel of the Throttler operator.
func (th *Throttler) In() chan<- any {
return th.in
}
// streamPortioned streams elements to the given Inlet, enforcing a quota.
// Elements are sent to inlet.In() until th.out is closed. If the quota is exceeded,
// the function blocks until a quota reset signal is received on th.quotaSignal.
func (th *Throttler) streamPortioned(inlet streams.Inlet) {
for element := range th.out {
if th.quotaExceeded() {
<-th.quotaSignal // wait for quota reset
}
th.counter.Add(1)
inlet.In() <- element
}
close(th.done)
close(inlet.In())
}