-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexporter.go
172 lines (155 loc) · 4.25 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"math/rand/v2"
"time"
arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/pdata/pmetric"
)
type Producer interface {
Produce(pmetric.MetricSlice) error
}
type ProducerConfig struct {
Producer Producer
ScopeName string
}
type stream struct {
endpoint arrowpb.ArrowMetricsService_ArrowMetricsClient
arrowProducer *arrow_record.Producer
}
type Exporter struct {
client arrowpb.ArrowMetricsServiceClient
// NB: someday we might want to have several producer groups,
// each of which collects at different intervals.
// For now we are only collecting one scope (GPU metrics)
// so the global interval is fine.
interval time.Duration
producers []ProducerConfig
resourceAttrs map[string]any
stream *stream
}
func NewExporter(client arrowpb.ArrowMetricsServiceClient, interval time.Duration, resourceAttrs map[string]any) Exporter {
return Exporter{
client: client,
interval: interval,
resourceAttrs: resourceAttrs,
stream: nil,
}
}
func (e *Exporter) AddProducer(p ProducerConfig) {
e.producers = append(e.producers, p)
}
func (e *Exporter) makeStream(ctx context.Context) error {
slog.Debug("making new stream")
endpoint, err := e.client.ArrowMetrics(ctx)
if err != nil {
return err
}
p := arrow_record.NewProducer()
e.stream = &stream{
endpoint: endpoint,
arrowProducer: p,
}
return nil
}
func (e *Exporter) report(ctx context.Context) error {
m := pmetric.NewMetrics()
r := m.ResourceMetrics().AppendEmpty()
if err := r.Resource().Attributes().FromRaw(e.resourceAttrs); err != nil {
return err
}
for _, p := range e.producers {
slog.Debug("Running arrow metrics producer", "scope", p.ScopeName)
s := r.ScopeMetrics().AppendEmpty()
s.Scope().SetName(p.ScopeName)
ms := s.Metrics()
if err := p.Producer.Produce(ms); err != nil {
slog.Warn("Producer failed to produce metrics", "scope", p.ScopeName, "error", err)
}
}
dpc := m.DataPointCount()
slog.Debug("About to report arrow metrics", "data points", dpc)
retriesRemaining := 1
var err error
var arrow *arrowpb.BatchArrowRecords
for retriesRemaining >= 0 {
retriesRemaining--
if e.stream == nil {
err = e.makeStream(ctx)
if err != nil {
// if we failed to create a new stream, don't retry.
// The point of the retry loop is to handle the stream
// legitimately going away e.g. due to the server
// having specified max_connection_age,
// not unexpected issues creating a new stream.
break
}
}
arrow, err = e.stream.arrowProducer.BatchArrowRecordsFromMetrics(m)
if err != nil {
slog.Warn("Error on produce", "error", err)
e.stream = nil
continue
}
start := time.Now()
err = e.stream.endpoint.Send(arrow)
if err != nil {
slog.Warn("Error on send", "error", err)
e.stream = nil
continue
}
batchStatus, err := e.stream.endpoint.Recv()
if err != nil {
slog.Warn("Error on recv", "error", err)
return err
}
if batchStatus.GetStatusCode() != arrowpb.StatusCode_OK {
slog.Warn("unexpected status code",
"status", batchStatus.GetStatusCode(),
"message", batchStatus.GetStatusMessage(),
)
return err
}
slog.Info("Send succeeded",
"data points", dpc,
"duration", time.Since(start),
)
break
}
if err != nil {
return err
}
return nil
}
func (e *Exporter) Start(ctx context.Context) error {
slog.Info("running arrow metrics exporter", "producers", len(e.producers))
if len(e.producers) == 0 {
return errors.New("no producers configured")
}
tick := time.NewTicker(e.interval)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-tick.C:
if err := e.report(ctx); err != nil {
return fmt.Errorf("failed to send arrow metrics: %v", err)
}
tick.Reset(addJitter(e.interval, 0.2))
}
}
}
// addJitter adds +/- jitter (jitter is [0..1]) to baseDuration
// originally copied from go.opentelemetry.io/epbf-profiler
func addJitter(baseDuration time.Duration, jitter float64) time.Duration {
if jitter < 0.0 || jitter > 1.0 {
return baseDuration
}
//nolint:gosec
return time.Duration((1 + jitter - 2*jitter*rand.Float64()) * float64(baseDuration))
}