-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcollector.go
147 lines (125 loc) · 3.94 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"
"time"
_ "github.com/denisenkom/go-mssqldb"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
const (
namespace = "query"
exporter = "exporter"
)
// QueryCollector query exporter collector
type QueryCollector struct {
instances Instances
collects []Collect
StatusDesc *prometheus.Desc
}
// Describe prometheus describe
func (e *QueryCollector) Describe(ch chan<- *prometheus.Desc) {
}
// Collect prometheus collect
func (e *QueryCollector) Collect(ch chan<- prometheus.Metric) {
for _, instance := range e.instances {
e.scrape(*instance, ch)
}
}
// scrape connnect to database and gather query result
func (e *QueryCollector) scrape(instance Instance, ch chan<- prometheus.Metric) {
// Collector status
var collectStatus float64
defer func() {
log.Debugf("[%s] collector status: %d", instance.Name, collectStatus)
ch <- prometheus.MustNewConstMetric(e.StatusDesc, prometheus.GaugeValue, collectStatus, instance.Name)
}()
// Connect to database
db, err := sqlOpen[instance.Type](instance.DSN)
if err != nil {
log.Errorf("[%s] Connect to %s database failed: %s", instance.Name, instance.Type, err)
return
}
defer db.Close()
// Connection check
ctx, cancel := context.WithTimeout(context.Background(), defaultQueryTimeout*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
log.Errorf("[%s] Ping to %s database failed: %s", instance.Name, instance.Type, err)
return
}
// Execute collect queries, and make metrics for the result
for _, collect := range e.collects {
log.Debugf("[%s] execute query: %s", instance.Name, collect.Query)
// Query timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(collect.Timeout)*time.Second)
rows, err := db.QueryContext(ctx, collect.Query)
defer cancel()
if err != nil {
log.Errorf("[%s] Failed to execute query: %s>> %s", instance.Name, err, collect.Query)
return
}
cols, err := rows.Columns()
if err != nil {
log.Errorf("[%s] Failed to get column info: %s", instance.Name, err)
continue
}
log.Debugf("[%s] cols - %s", instance.Name, cols)
des := make([]interface{}, len(cols))
res := make([][]byte, len(cols))
for i := range cols {
des[i] = &res[i]
}
for rows.Next() {
if err = rows.Scan(des...); err != nil {
log.Errorf("[%s] row scan error, break rows.Nexe(): %s", instance.Name, err)
break
}
data := make(map[string]string)
for i, bytes := range res {
data[cols[i]] = string(bytes)
}
data["instance"] = instance.Name
for _, metric := range collect.Metrics {
log.Debugf("[%s] metric labels: %s", instance.Name, metric.metricDesc)
labelVals := []string{}
for _, label := range metric.Labels {
labelVals = append(labelVals, data[label])
}
log.Debugf("[%s] metric values: %s", instance.Name, labelVals)
val, _ := strconv.ParseFloat(data[metric.Value], 64)
switch strings.ToLower(metric.Type) {
case "counter":
ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.CounterValue, val, labelVals...)
case "gauge":
ch <- prometheus.MustNewConstMetric(metric.metricDesc, prometheus.GaugeValue, val, labelVals...)
default:
log.Errorf("[%s] Metric type support only counter|gauge, skip", instance.Name)
continue
}
}
}
}
collectStatus = 1
}
// Database connection map, current only mysql support
var sqlOpen = map[string]func(dsn string) (*sql.DB, error){
"mysql": func(dsn string) (*sql.DB, error) {
return sql.Open("mysql", dsn)
},
"postgres": func(dsn string) (*sql.DB, error) {
return sql.Open("postgres", dsn)
},
"mssql": func(dsn string) (*sql.DB, error) {
return nil, fmt.Errorf("mssql not support yet")
},
"sqlite": func(dsn string) (*sql.DB, error) {
return nil, fmt.Errorf("sqlite not support yet")
},
}