-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhistogram.go
352 lines (308 loc) · 8.82 KB
/
histogram.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
package metrics
import (
"math"
"strconv"
"sync"
"sync/atomic"
"time"
"go.withmatt.com/metrics/internal/atomicx"
)
const (
e10Min = -9
e10Max = 18
bucketsPerDecimal = 18
decimalBucketsCount = e10Max - e10Min
// histBuckets is number of buckets within a single histogram.
histBuckets = decimalBucketsCount * bucketsPerDecimal
// totalBuckets is histogram buckets + lower and upper buckets
totalBuckets = histBuckets + 2
// maxNumSeries is maximum possible series that can be emitted
// by a single histogram, this includes all buckets and the
// sum and count summaries
maxNumSeries = totalBuckets + 2
)
var (
bucketMultiplier = math.Pow(10, 1.0/bucketsPerDecimal)
bucketRanges [totalBuckets]string
)
type (
decimalBucket = [bucketsPerDecimal]atomic.Uint64
atomicDecimalBucket = atomic.Pointer[decimalBucket]
)
// NewHistogram creates a new Histogram on the global Set.
// See [Set.NewHistogram].
func NewHistogram(family string, tags ...string) *Histogram {
return defaultSet.NewHistogram(family, tags...)
}
// NewHistogram creates and returns new Histogram in s with the given name.
//
// family must be a Prometheus compatible identifier format.
//
// Optional tags must be specified in [label, value] pairs, for instance,
//
// NewHistogram("family", "label1", "value1", "label2", "value2")
//
// The returned Histogram is safe to use from concurrent goroutines.
//
// This will panic if values are invalid or already registered.
func (s *Set) NewHistogram(family string, tags ...string) *Histogram {
h := &Histogram{}
s.mustStoreMetric(h, MetricName{
Family: MustIdent(family),
Tags: MustTags(tags...),
})
return h
}
// Histogram is a histogram for non-negative values with automatically created buckets.
//
// See https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350
//
// Each bucket contains a counter for values in the given range.
// Each non-empty bucket is exposed via the following metric:
//
// <metric_name>_bucket{<optional_tags>,vmrange="<start>...<end>"} <counter>
//
// Where:
//
// - <metric_name> is the metric name passed to NewHistogram
// - <optional_tags> is optional tags for the <metric_name>, which are passed to NewHistogram
// - <start> and <end> - start and end values for the given bucket
// - <counter> - the number of hits to the given bucket during Update* calls
//
// Histogram buckets can be converted to Prometheus-like buckets with `le` labels
// with `prometheus_buckets(<metric_name>_bucket)` function from PromQL extensions in VictoriaMetrics.
// (see https://docs.victoriametrics.com/metricsql/ ):
//
// prometheus_buckets(request_duration_bucket)
//
// Time series produced by the Histogram have better compression ratio comparing to
// Prometheus histogram buckets with `le` labels, since they don't include counters
// for all the previous buckets.
//
// If you would like Prometheus style histogram buckets, see [FixedHistogram].
//
// Zero histogram is usable.
type Histogram struct {
// buckets contains counters for histogram buckets
buckets [decimalBucketsCount]atomicDecimalBucket
// lower is the number of values, which hit the lower bucket
lower atomic.Uint64
// upper is the number of values, which hit the upper bucket
upper atomic.Uint64
// sum is the sum of all the values put into Histogram
sum atomicx.Sum
}
// Reset resets the given histogram.
func (h *Histogram) Reset() {
clear(h.buckets[:])
h.lower.Store(0)
h.upper.Store(0)
h.sum.Reset()
}
// Update updates h with val.
//
// Negative values and NaNs are ignored.
func (h *Histogram) Update(val float64) {
if math.IsNaN(val) || val < 0 {
// Skip NaNs and negative values.
return
}
bucketIdx := (math.Log10(val) - e10Min) * bucketsPerDecimal
switch {
case bucketIdx < 0:
h.lower.Add(1)
return
case bucketIdx >= histBuckets:
h.upper.Add(1)
default:
idx := uint(bucketIdx)
if bucketIdx == float64(idx) && idx > 0 {
// Edge case for 10^n values, which must go to the lower bucket
// according to Prometheus logic for `le`-based histograms.
idx--
}
decimalBucketIdx := idx / bucketsPerDecimal
offset := idx % bucketsPerDecimal
db := h.buckets[decimalBucketIdx].Load()
if db == nil {
// this bucket doesn't exist yet
var dbNew decimalBucket
if h.buckets[decimalBucketIdx].CompareAndSwap(db, &dbNew) {
db = &dbNew
} else {
db = h.buckets[decimalBucketIdx].Load()
}
}
db[offset].Add(1)
}
h.sum.Add(val)
}
// Observe updates h with val, identical to [Histogram.Update].
//
// Negative values and NaNs are ignored.
func (h *Histogram) Observe(val float64) {
h.Update(val)
}
// Merge merges src to h.
func (h *Histogram) Merge(src *Histogram) {
h.lower.Add(src.lower.Load())
h.upper.Add(src.upper.Load())
h.sum.Add(src.sum.Load())
for i := range src.buckets {
if dbSrc := src.buckets[i].Load(); dbSrc != nil {
dbDst := h.buckets[i].Load()
if dbDst == nil {
// this bucket doesn't exist yet
var dbNew decimalBucket
if h.buckets[i].CompareAndSwap(dbDst, &dbNew) {
dbDst = &dbNew
} else {
dbDst = h.buckets[i].Load()
}
}
for j := range dbSrc {
dbDst[j].Add(dbSrc[j].Load())
}
}
}
}
// UpdateDuration updates request duration based on the given startTime.
func (h *Histogram) UpdateDuration(startTime time.Time) {
h.Update(time.Since(startTime).Seconds())
}
func (h *Histogram) marshalTo(w ExpfmtWriter, name MetricName) {
card := punchCardPool.Get().(*punchCard)
defer func() {
clear(card[:])
punchCardPool.Put(card)
}()
totalCounts, punches := h.punchBuckets(card)
if totalCounts == 0 {
return
}
sum := h.sum.Load()
family := name.Family.String()
// 1 extra because we're always adding in the vmrange tag
// and sizeOfTags doesn't include a trailing comma
tagsSize := sizeOfTags(name.Tags, w.constantTags) + 1
const (
chunkVMRange = `_bucket{vmrange="`
chunkSum = "_sum"
chunkCount = "_count"
)
// we need the underlying bytes.Buffer
b := w.b
// this is trying to compute up front how much we'll need to write
// below with some margin of error to make sure we allocate enough
// since we can't compute exactly
b.Grow(
(len(family) * punches) +
(tagsSize * punches) +
(len(chunkVMRange) * punches) +
punches +
len(family) + len(chunkSum) + tagsSize + 3 +
len(family) + len(chunkCount) + tagsSize + 3 +
64, // extra margin of error
)
// Write each `_bucket` count metric
// This ultimately constructs a line such as:
// foo_bucket{vmrange="...",foo="bar"} 5
for idx, count := range card {
if count > 0 {
vmrange := bucketRanges[idx]
b.WriteString(family)
b.WriteString(chunkVMRange)
b.WriteString(vmrange)
b.WriteByte('"')
if len(w.constantTags) > 0 {
b.WriteByte(',')
b.WriteString(w.constantTags)
}
for _, tag := range name.Tags {
b.WriteByte(',')
writeTag(b, tag)
}
b.WriteString(`} `)
writeUint64(b, count)
b.WriteByte('\n')
}
}
// Write our `_sum` line
// This ultimately constructs a line such as:
// foo_sum{foo="bar"} 5
b.WriteString(family)
b.WriteString(chunkSum)
if tagsSize > 0 {
b.WriteByte('{')
writeTags(b, w.constantTags, name.Tags)
b.WriteByte('}')
}
b.WriteByte(' ')
writeFloat64(b, sum)
b.WriteByte('\n')
// Write our `_count` line
// This ultimately constructs a line such as:
// foo_count{foo="bar"} 5
b.WriteString(family)
b.WriteString(chunkCount)
if tagsSize > 0 {
b.WriteByte('{')
writeTags(b, w.constantTags, name.Tags)
b.WriteByte('}')
}
b.WriteByte(' ')
writeUint64(b, totalCounts)
b.WriteByte('\n')
}
// punchBuckets marks the counts on the punchCard corresponding to which
// histogram buckets have counts.
func (h *Histogram) punchBuckets(c *punchCard) (total uint64, punches int) {
if count := h.lower.Load(); count > 0 {
c[0] = count
total += count
punches++
}
if count := h.upper.Load(); count > 0 {
c[len(c)-1] = count
total += count
punches++
}
for idx := range h.buckets {
if db := h.buckets[idx].Load(); db != nil {
for offset := range db {
if count := db[offset].Load(); count > 0 {
bucketIdx := idx*bucketsPerDecimal + offset
c[bucketIdx+1] = count
total += count
punches++
}
}
}
}
return
}
// punchCard is used internally to track counts per bucket when computing
// which histograms ranges have been hit.
type punchCard [totalBuckets]uint64
var punchCardPool = sync.Pool{
New: func() any {
var c punchCard
return &c
},
}
func init() {
// pre-compute all bucket ranges
v := math.Pow10(e10Min)
bucketRanges[0] = "0..." + formatBucket(v)
start := formatBucket(v)
for i := range histBuckets {
v *= bucketMultiplier
end := formatBucket(v)
bucketRanges[i+1] = start + "..." + end
start = end
}
bucketRanges[totalBuckets-1] = formatBucket(math.Pow10(e10Max)) + "...+Inf"
}
func formatBucket(v float64) string {
return strconv.FormatFloat(v, 'e', 3, 64)
}