Skip to content

Commit 1b1eb48

Browse files
Merge pull request percona#193 from percona/PMM-12893-rolling-collection-strategy
PMM-12894 Rolling strategy for connections utilization
2 parents 6ae0186 + 0ba671d commit 1b1eb48

11 files changed

+396
-292
lines changed

cmd/postgres_exporter/datasource.go

+31-12
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,16 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
4848
level.Error(logger).Log("msg", "Unable to parse DSN as either URI or connstring", "dsn", loggableDSN(dsn))
4949
continue
5050
}
51-
52-
server, err := e.servers.GetServer(dsn, e.resolutionEnabled)
53-
if err != nil {
54-
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
55-
continue
56-
}
5751
dsns[dsn] = struct{}{}
5852

5953
// If autoDiscoverDatabases is true, set first dsn as master database (Default: false)
60-
server.master = true
54+
e.masterDSN = dsn
6155

62-
databaseNames, err := queryDatabases(server)
56+
databaseNames, err := e.getDatabaseNames(dsn)
6357
if err != nil {
64-
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
6558
continue
6659
}
60+
6761
for _, databaseName := range databaseNames {
6862
if contains(e.excludeDatabases, databaseName) {
6963
continue
@@ -99,15 +93,40 @@ func (e *Exporter) discoverDatabaseDSNs() []string {
9993
return result
10094
}
10195

102-
func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
103-
server, err := e.servers.GetServer(dsn, e.resolutionEnabled)
96+
func (e *Exporter) getDatabaseNames(dsn string) ([]string, error) {
97+
if e.connSema != nil {
98+
if err := e.connSema.Acquire(e.ctx, 1); err != nil {
99+
level.Warn(logger).Log("msg", "Failed to acquire semaphore", "err", err)
100+
return nil, err
101+
}
102+
defer e.connSema.Release(1)
103+
}
104104

105+
server, err := e.GetServer(dsn)
106+
if err != nil {
107+
level.Error(logger).Log("msg", "Error opening connection to database", "dsn", loggableDSN(dsn), "err", err)
108+
return nil, err
109+
}
110+
defer server.Close()
111+
112+
dbNames, err := queryDatabases(e.ctx, server)
113+
if err != nil {
114+
level.Error(logger).Log("msg", "Error querying databases", "dsn", loggableDSN(dsn), "err", err)
115+
return nil, err
116+
}
117+
118+
return dbNames, nil
119+
}
120+
121+
func (e *Exporter) scrapeDSN(ch chan<- prometheus.Metric, dsn string) error {
122+
server, err := e.GetServer(dsn)
105123
if err != nil {
106124
return &ErrorConnectToServer{fmt.Sprintf("Error opening connection to database (%s): %s", loggableDSN(dsn), err.Error())}
107125
}
126+
defer server.Close()
108127

109128
// Check if autoDiscoverDatabases is false, set dsn as master database (Default: false)
110-
if !e.autoDiscoverDatabases {
129+
if !e.autoDiscoverDatabases || e.masterDSN == dsn {
111130
server.master = true
112131
}
113132

cmd/postgres_exporter/main.go

+10-136
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,22 @@ package main
1616
import (
1717
"fmt"
1818
"net/http"
19+
_ "net/http/pprof"
1920
"os"
2021
"strings"
2122

22-
_ "net/http/pprof"
23-
2423
"github.com/alecthomas/kingpin/v2"
2524
"github.com/go-kit/log"
2625
"github.com/go-kit/log/level"
27-
"github.com/prometheus-community/postgres_exporter/collector"
2826
"github.com/prometheus-community/postgres_exporter/config"
2927
"github.com/prometheus/client_golang/prometheus"
3028
"github.com/prometheus/client_golang/prometheus/collectors"
31-
"github.com/prometheus/client_golang/prometheus/promhttp"
3229
"github.com/prometheus/common/promlog"
3330
"github.com/prometheus/common/promlog/flag"
3431
"github.com/prometheus/common/version"
3532
"github.com/prometheus/exporter-toolkit/web"
3633
"github.com/prometheus/exporter-toolkit/web/kingpinflag"
34+
"golang.org/x/sync/semaphore"
3735
)
3836

3937
var (
@@ -51,7 +49,7 @@ var (
5149
disableDefaultMetrics = kingpin.Flag("disable-default-metrics", "Do not include default metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_DEFAULT_METRICS").Bool()
5250
disableSettingsMetrics = kingpin.Flag("disable-settings-metrics", "Do not include pg_settings metrics.").Default("false").Envar("PG_EXPORTER_DISABLE_SETTINGS_METRICS").Bool()
5351
autoDiscoverDatabases = kingpin.Flag("auto-discover-databases", "Whether to discover the databases on a server dynamically. (DEPRECATED)").Default("false").Envar("PG_EXPORTER_AUTO_DISCOVER_DATABASES").Bool()
54-
//queriesPath = kingpin.Flag("extend.query-path", "Path to custom queries to run. (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXTEND_QUERY_PATH").String()
52+
// queriesPath = kingpin.Flag("extend.query-path", "Path to custom queries to run. (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXTEND_QUERY_PATH").String()
5553
onlyDumpMaps = kingpin.Flag("dumpmaps", "Do not run, simply dump the maps.").Bool()
5654
constantLabelsList = kingpin.Flag("constantLabels", "A list of label=value separated by comma(,). (DEPRECATED)").Default("").Envar("PG_EXPORTER_CONSTANT_LABELS").String()
5755
excludeDatabases = kingpin.Flag("exclude-databases", "A list of databases to remove when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_EXCLUDE_DATABASES").String()
@@ -103,9 +101,9 @@ func main() {
103101
excludedDatabases := strings.Split(*excludeDatabases, ",")
104102
logger.Log("msg", "Excluded databases", "databases", fmt.Sprintf("%v", excludedDatabases))
105103

106-
//if *queriesPath != "" {
104+
// if *queriesPath != "" {
107105
// level.Warn(logger).Log("msg", "The extended queries.yaml config is DEPRECATED", "file", *queriesPath)
108-
//}
106+
// }
109107

110108
if *autoDiscoverDatabases || *excludeDatabases != "" || *includeDatabases != "" {
111109
level.Warn(logger).Log("msg", "Scraping additional databases via auto discovery is DEPRECATED")
@@ -115,65 +113,18 @@ func main() {
115113
level.Warn(logger).Log("msg", "Constant labels on all metrics is DEPRECATED")
116114
}
117115

118-
servers := NewServers(ServerWithLabels(parseConstLabels(*constantLabelsList)))
119-
120-
opts := []ExporterOpt{
121-
CollectorName("exporter"),
122-
DisableDefaultMetrics(*disableDefaultMetrics),
123-
DisableSettingsMetrics(*disableSettingsMetrics),
124-
AutoDiscoverDatabases(*autoDiscoverDatabases),
125-
WithConstantLabels(*constantLabelsList),
126-
WithServers(servers),
127-
ExcludeDatabases(excludedDatabases),
128-
IncludeDatabases(*includeDatabases),
129-
}
130-
131-
exporter := NewExporter(dsns, opts...)
132-
defer func() {
133-
exporter.servers.Close()
134-
}()
135-
136116
versionCollector := version.NewCollector(exporterName)
137-
prometheus.MustRegister(versionCollector)
138-
139-
prometheus.MustRegister(exporter)
140-
141-
// TODO(@sysadmind): Remove this with multi-target support. We are removing multiple DSN support
142-
dsn := ""
143-
if len(dsns) > 0 {
144-
dsn = dsns[0]
145-
}
146-
147-
cleanup, hr, mr, lr := initializePerconaExporters(dsns, servers)
148-
defer cleanup()
149-
150-
pe, err := collector.NewPostgresCollector(
151-
logger,
152-
excludedDatabases,
153-
dsn,
154-
[]string{},
155-
)
156-
if err != nil {
157-
level.Warn(logger).Log("msg", "Failed to create PostgresCollector", "err", err.Error())
158-
} else {
159-
prometheus.MustRegister(pe)
160-
}
161-
162117
psCollector := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})
163118
goCollector := collectors.NewGoCollector()
164119

165-
promHandler := newHandler(map[string]prometheus.Collector{
166-
"exporter": exporter,
167-
"custom_query.hr": hr,
168-
"custom_query.mr": mr,
169-
"custom_query.lr": lr,
120+
globalCollectors := map[string]prometheus.Collector{
170121
"standard.process": psCollector,
171122
"standard.go": goCollector,
172123
"version": versionCollector,
173-
"postgres": pe,
174-
})
124+
}
175125

176-
http.Handle(*metricsPath, promHandler)
126+
connSema := semaphore.NewWeighted(*maxConnections)
127+
http.Handle(*metricsPath, Handler(logger, dsns, connSema, globalCollectors))
177128

178129
if *metricsPath != "/" && *metricsPath != "" {
179130
landingConfig := web.LandingConfig{
@@ -195,7 +146,7 @@ func main() {
195146
http.Handle("/", landingPage)
196147
}
197148

198-
http.HandleFunc("/probe", handleProbe(logger, excludedDatabases))
149+
http.HandleFunc("/probe", handleProbe(logger, excludedDatabases, connSema))
199150

200151
level.Info(logger).Log("msg", "Listening on address", "address", *webConfig.WebListenAddresses)
201152
srv := &http.Server{}
@@ -204,80 +155,3 @@ func main() {
204155
os.Exit(1)
205156
}
206157
}
207-
208-
// handler wraps an unfiltered http.Handler but uses a filtered handler,
209-
// created on the fly, if filtering is requested. Create instances with
210-
// newHandler. It used for collectors filtering.
211-
type handler struct {
212-
unfilteredHandler http.Handler
213-
collectors map[string]prometheus.Collector
214-
}
215-
216-
func newHandler(collectors map[string]prometheus.Collector) *handler {
217-
h := &handler{collectors: collectors}
218-
219-
innerHandler, err := h.innerHandler()
220-
if err != nil {
221-
level.Error(logger).Log("msg", "Couldn't create metrics handler", "error", err)
222-
os.Exit(1)
223-
}
224-
225-
h.unfilteredHandler = innerHandler
226-
return h
227-
}
228-
229-
// ServeHTTP implements http.Handler.
230-
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
231-
filters := r.URL.Query()["collect[]"]
232-
level.Debug(logger).Log("msg", "Collect query", "filters", filters)
233-
234-
if len(filters) == 0 {
235-
// No filters, use the prepared unfiltered handler.
236-
h.unfilteredHandler.ServeHTTP(w, r)
237-
return
238-
}
239-
240-
filteredHandler, err := h.innerHandler(filters...)
241-
if err != nil {
242-
level.Warn(logger).Log("msg", "Couldn't create filtered metrics handler", "error", err)
243-
w.WriteHeader(http.StatusBadRequest)
244-
w.Write([]byte(fmt.Sprintf("Couldn't create filtered metrics handler: %s", err))) // nolint: errcheck
245-
return
246-
}
247-
248-
filteredHandler.ServeHTTP(w, r)
249-
}
250-
251-
func (h *handler) innerHandler(filters ...string) (http.Handler, error) {
252-
registry := prometheus.NewRegistry()
253-
254-
// register all collectors by default.
255-
if len(filters) == 0 {
256-
for name, c := range h.collectors {
257-
if err := registry.Register(c); err != nil {
258-
return nil, err
259-
}
260-
level.Debug(logger).Log("msg", "Collector was registered", "collector", name)
261-
}
262-
}
263-
264-
// register only filtered collectors.
265-
for _, name := range filters {
266-
if c, ok := h.collectors[name]; ok {
267-
if err := registry.Register(c); err != nil {
268-
return nil, err
269-
}
270-
level.Debug(logger).Log("msg", "Collector was registered", "collector", name)
271-
}
272-
}
273-
274-
handler := promhttp.HandlerFor(
275-
registry,
276-
promhttp.HandlerOpts{
277-
//ErrorLog: log.NewNopLogger() .NewErrorLogger(),
278-
ErrorHandling: promhttp.ContinueOnError,
279-
},
280-
)
281-
282-
return handler, nil
283-
}

0 commit comments

Comments
 (0)