forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathretention_test.go
181 lines (153 loc) · 5.38 KB
/
retention_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package storage
import (
"context"
"math"
"math/rand"
"reflect"
"testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/prom/promtest"
"github.com/influxdata/influxdb/tsdb"
"github.com/prometheus/client_golang/prometheus"
)
func TestRetentionService(t *testing.T) {
engine := NewTestEngine()
service := newRetentionEnforcer(engine, NewTestBucketFinder())
now := time.Date(2018, 4, 10, 23, 12, 33, 0, time.UTC)
t.Run("no buckets", func(t *testing.T) {
service.expireData(context.Background(), nil, now)
service.expireData(context.Background(), []*influxdb.Bucket{}, now)
})
// Generate some buckets to expire
buckets := []*influxdb.Bucket{}
expMatched := map[string]struct{}{} // To be used for verifying test results.
expRejected := map[string]struct{}{} // To be used for verifying test results.
for i := 0; i < 15; i++ {
name := genMeasurementName()
var n [16]byte
copy(n[:], name)
orgID, bucketID := tsdb.DecodeName(n)
// Put 1/3rd in the rpByBucketID into the set to delete and 1/3rd into the set
// to not delete because no rp, and 1/3rd into the set to not delete because 0 rp.
if i%3 == 0 {
buckets = append(buckets, &influxdb.Bucket{
OrgID: orgID,
ID: bucketID,
RetentionPeriod: 3 * time.Hour,
})
expMatched[string(name)] = struct{}{}
} else if i%3 == 1 {
expRejected[string(name)] = struct{}{}
} else if i%3 == 2 {
buckets = append(buckets, &influxdb.Bucket{
OrgID: orgID,
ID: bucketID,
RetentionPeriod: 0,
})
expRejected[string(name)] = struct{}{}
}
}
gotMatched := map[string]struct{}{}
engine.DeleteBucketRangeFn = func(orgID, bucketID influxdb.ID, from, to int64) error {
if from != math.MinInt64 {
t.Fatalf("got from %d, expected %d", from, math.MinInt64)
}
wantTo := now.Add(-3 * time.Hour).UnixNano()
if to != wantTo {
t.Fatalf("got to %d, expected %d", to, wantTo)
}
name := tsdb.EncodeName(orgID, bucketID)
if _, ok := expRejected[string(name[:])]; ok {
t.Fatalf("got a delete for %x", name)
}
gotMatched[string(name[:])] = struct{}{}
return nil
}
t.Run("multiple buckets", func(t *testing.T) {
service.expireData(context.Background(), buckets, now)
if !reflect.DeepEqual(gotMatched, expMatched) {
t.Fatalf("got\n%#v\nexpected\n%#v", gotMatched, expMatched)
}
})
}
func TestMetrics_Retention(t *testing.T) {
// metrics to be shared by multiple file stores.
metrics := newRetentionMetrics(prometheus.Labels{"engine_id": "", "node_id": ""})
t1 := newRetentionTracker(metrics, prometheus.Labels{"engine_id": "0", "node_id": "0"})
t2 := newRetentionTracker(metrics, prometheus.Labels{"engine_id": "1", "node_id": "0"})
reg := prometheus.NewRegistry()
reg.MustRegister(metrics.PrometheusCollectors()...)
base := namespace + "_" + retentionSubsystem + "_"
// Generate some measurements.
for i, tracker := range []*retentionTracker{t1, t2} {
tracker.IncChecks(influxdb.ID(i+1), influxdb.ID(i+1), true)
tracker.IncChecks(influxdb.ID(i+1), influxdb.ID(i+1), false)
tracker.CheckDuration(time.Second, true)
tracker.CheckDuration(time.Second, false)
}
// Test that all the correct metrics are present.
mfs, err := reg.Gather()
if err != nil {
t.Fatal(err)
}
// The label variants for the two caches.
labelVariants := []prometheus.Labels{
prometheus.Labels{"engine_id": "0", "node_id": "0"},
prometheus.Labels{"engine_id": "1", "node_id": "0"},
}
for i, labels := range labelVariants {
for _, status := range []string{"ok", "error"} {
labels["status"] = status
l := make(prometheus.Labels, len(labels))
for k, v := range labels {
l[k] = v
}
l["org_id"] = influxdb.ID(i + 1).String()
l["bucket_id"] = influxdb.ID(i + 1).String()
name := base + "checks_total"
metric := promtest.MustFindMetric(t, mfs, name, l)
if got, exp := metric.GetCounter().GetValue(), float64(1); got != exp {
t.Errorf("[%s %d %v] got %v, expected %v", name, i, l, got, exp)
}
name = base + "check_duration_seconds"
metric = promtest.MustFindMetric(t, mfs, name, labels)
if got, exp := metric.GetHistogram().GetSampleSum(), float64(1); got != exp {
t.Errorf("[%s %d %v] got %v, expected %v", name, i, labels, got, exp)
}
}
}
}
// genMeasurementName generates a random measurement name or panics.
func genMeasurementName() []byte {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
panic(err)
}
return b
}
type TestEngine struct {
DeleteBucketRangeFn func(influxdb.ID, influxdb.ID, int64, int64) error
}
func NewTestEngine() *TestEngine {
return &TestEngine{
DeleteBucketRangeFn: func(influxdb.ID, influxdb.ID, int64, int64) error { return nil },
}
}
func (e *TestEngine) DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64) error {
return e.DeleteBucketRangeFn(orgID, bucketID, min, max)
}
type TestBucketFinder struct {
FindBucketsFn func(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
}
func NewTestBucketFinder() *TestBucketFinder {
return &TestBucketFinder{
FindBucketsFn: func(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return nil, 0, nil
},
}
}
func (f *TestBucketFinder) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opts ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
return f.FindBucketsFn(ctx, filter, opts...)
}