@@ -37,15 +37,11 @@ type LDBDatabase struct {
37
37
fn string // filename for reporting
38
38
db * leveldb.DB // LevelDB instance
39
39
40
- getTimer metrics.Timer // Timer for measuring the database get request counts and latencies
41
- putTimer metrics.Timer // Timer for measuring the database put request counts and latencies
42
- delTimer metrics.Timer // Timer for measuring the database delete request counts and latencies
43
- missMeter metrics.Meter // Meter for measuring the missed database get requests
44
- readMeter metrics.Meter // Meter for measuring the database get request data usage
45
- writeMeter metrics.Meter // Meter for measuring the database put request data usage
46
40
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
47
41
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
48
42
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
43
+ diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
44
+ diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
49
45
50
46
quitLock sync.Mutex // Mutex protecting the quit channel access
51
47
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
@@ -94,16 +90,9 @@ func (db *LDBDatabase) Path() string {
94
90
95
91
// Put puts the given key / value to the queue
96
92
func (db * LDBDatabase ) Put (key []byte , value []byte ) error {
97
- // Measure the database put latency, if requested
98
- if db .putTimer != nil {
99
- defer db .putTimer .UpdateSince (time .Now ())
100
- }
101
93
// Generate the data to write to disk, update the meter and write
102
94
//value = rle.Compress(value)
103
95
104
- if db .writeMeter != nil {
105
- db .writeMeter .Mark (int64 (len (value )))
106
- }
107
96
return db .db .Put (key , value , nil )
108
97
}
109
98
@@ -113,32 +102,17 @@ func (db *LDBDatabase) Has(key []byte) (bool, error) {
113
102
114
103
// Get returns the given key if it's present.
115
104
func (db * LDBDatabase ) Get (key []byte ) ([]byte , error ) {
116
- // Measure the database get latency, if requested
117
- if db .getTimer != nil {
118
- defer db .getTimer .UpdateSince (time .Now ())
119
- }
120
105
// Retrieve the key and increment the miss counter if not found
121
106
dat , err := db .db .Get (key , nil )
122
107
if err != nil {
123
- if db .missMeter != nil {
124
- db .missMeter .Mark (1 )
125
- }
126
108
return nil , err
127
109
}
128
- // Otherwise update the actually retrieved amount of data
129
- if db .readMeter != nil {
130
- db .readMeter .Mark (int64 (len (dat )))
131
- }
132
110
return dat , nil
133
111
//return rle.Decompress(dat)
134
112
}
135
113
136
114
// Delete deletes the key from the queue and database
137
115
func (db * LDBDatabase ) Delete (key []byte ) error {
138
- // Measure the database delete latency, if requested
139
- if db .delTimer != nil {
140
- defer db .delTimer .UpdateSince (time .Now ())
141
- }
142
116
// Execute the actual operation
143
117
return db .db .Delete (key , nil )
144
118
}
@@ -178,15 +152,11 @@ func (db *LDBDatabase) Meter(prefix string) {
178
152
return
179
153
}
180
154
// Initialize all the metrics collector at the requested prefix
181
- db .getTimer = metrics .NewRegisteredTimer (prefix + "user/gets" , nil )
182
- db .putTimer = metrics .NewRegisteredTimer (prefix + "user/puts" , nil )
183
- db .delTimer = metrics .NewRegisteredTimer (prefix + "user/dels" , nil )
184
- db .missMeter = metrics .NewRegisteredMeter (prefix + "user/misses" , nil )
185
- db .readMeter = metrics .NewRegisteredMeter (prefix + "user/reads" , nil )
186
- db .writeMeter = metrics .NewRegisteredMeter (prefix + "user/writes" , nil )
187
155
db .compTimeMeter = metrics .NewRegisteredMeter (prefix + "compact/time" , nil )
188
156
db .compReadMeter = metrics .NewRegisteredMeter (prefix + "compact/input" , nil )
189
157
db .compWriteMeter = metrics .NewRegisteredMeter (prefix + "compact/output" , nil )
158
+ db .diskReadMeter = metrics .NewRegisteredMeter (prefix + "disk/read" , nil )
159
+ db .diskWriteMeter = metrics .NewRegisteredMeter (prefix + "disk/write" , nil )
190
160
191
161
// Create a quit channel for the periodic collector and run it
192
162
db .quitLock .Lock ()
@@ -207,12 +177,17 @@ func (db *LDBDatabase) Meter(prefix string) {
207
177
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
208
178
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
209
179
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
180
+ //
181
+ // This is how the iostats look like (currently):
182
+ // Read(MB):3895.04860 Write(MB):3654.64712
210
183
func (db * LDBDatabase ) meter (refresh time.Duration ) {
211
- // Create the counters to store current and previous values
212
- counters := make ([][]float64 , 2 )
184
+ // Create the counters to store current and previous compaction values
185
+ compactions := make ([][]float64 , 2 )
213
186
for i := 0 ; i < 2 ; i ++ {
214
- counters [i ] = make ([]float64 , 3 )
187
+ compactions [i ] = make ([]float64 , 3 )
215
188
}
189
+ // Create storage for iostats.
190
+ var iostats [2 ]float64
216
191
// Iterate ad infinitum and collect the stats
217
192
for i := 1 ; ; i ++ {
218
193
// Retrieve the database stats
@@ -233,8 +208,8 @@ func (db *LDBDatabase) meter(refresh time.Duration) {
233
208
lines = lines [3 :]
234
209
235
210
// Iterate over all the table rows, and accumulate the entries
236
- for j := 0 ; j < len (counters [i % 2 ]); j ++ {
237
- counters [i % 2 ][j ] = 0
211
+ for j := 0 ; j < len (compactions [i % 2 ]); j ++ {
212
+ compactions [i % 2 ][j ] = 0
238
213
}
239
214
for _ , line := range lines {
240
215
parts := strings .Split (line , "|" )
@@ -247,19 +222,60 @@ func (db *LDBDatabase) meter(refresh time.Duration) {
247
222
db .log .Error ("Compaction entry parsing failed" , "err" , err )
248
223
return
249
224
}
250
- counters [i % 2 ][idx ] += value
225
+ compactions [i % 2 ][idx ] += value
251
226
}
252
227
}
253
228
// Update all the requested meters
254
229
if db .compTimeMeter != nil {
255
- db .compTimeMeter .Mark (int64 ((counters [i % 2 ][0 ] - counters [(i - 1 )% 2 ][0 ]) * 1000 * 1000 * 1000 ))
230
+ db .compTimeMeter .Mark (int64 ((compactions [i % 2 ][0 ] - compactions [(i - 1 )% 2 ][0 ]) * 1000 * 1000 * 1000 ))
256
231
}
257
232
if db .compReadMeter != nil {
258
- db .compReadMeter .Mark (int64 ((counters [i % 2 ][1 ] - counters [(i - 1 )% 2 ][1 ]) * 1024 * 1024 ))
233
+ db .compReadMeter .Mark (int64 ((compactions [i % 2 ][1 ] - compactions [(i - 1 )% 2 ][1 ]) * 1024 * 1024 ))
259
234
}
260
235
if db .compWriteMeter != nil {
261
- db .compWriteMeter .Mark (int64 ((counters [i % 2 ][2 ] - counters [(i - 1 )% 2 ][2 ]) * 1024 * 1024 ))
236
+ db .compWriteMeter .Mark (int64 ((compactions [i % 2 ][2 ] - compactions [(i - 1 )% 2 ][2 ]) * 1024 * 1024 ))
237
+ }
238
+
239
+ // Retrieve the database iostats.
240
+ ioStats , err := db .db .GetProperty ("leveldb.iostats" )
241
+ if err != nil {
242
+ db .log .Error ("Failed to read database iostats" , "err" , err )
243
+ return
244
+ }
245
+ parts := strings .Split (ioStats , " " )
246
+ if len (parts ) < 2 {
247
+ db .log .Error ("Bad syntax of ioStats" , "ioStats" , ioStats )
248
+ return
262
249
}
250
+ r := strings .Split (parts [0 ], ":" )
251
+ if len (r ) < 2 {
252
+ db .log .Error ("Bad syntax of read entry" , "entry" , parts [0 ])
253
+ return
254
+ }
255
+ read , err := strconv .ParseFloat (r [1 ], 64 )
256
+ if err != nil {
257
+ db .log .Error ("Read entry parsing failed" , "err" , err )
258
+ return
259
+ }
260
+ w := strings .Split (parts [1 ], ":" )
261
+ if len (w ) < 2 {
262
+ db .log .Error ("Bad syntax of write entry" , "entry" , parts [1 ])
263
+ return
264
+ }
265
+ write , err := strconv .ParseFloat (w [1 ], 64 )
266
+ if err != nil {
267
+ db .log .Error ("Write entry parsing failed" , "err" , err )
268
+ return
269
+ }
270
+ if db .diskReadMeter != nil {
271
+ db .diskReadMeter .Mark (int64 ((read - iostats [0 ]) * 1024 * 1024 ))
272
+ }
273
+ if db .diskWriteMeter != nil {
274
+ db .diskWriteMeter .Mark (int64 ((write - iostats [1 ]) * 1024 * 1024 ))
275
+ }
276
+ iostats [0 ] = read
277
+ iostats [1 ] = write
278
+
263
279
// Sleep a bit, then repeat the stats collection
264
280
select {
265
281
case errc := <- db .quitChan :
0 commit comments