Skip to content

Add option es.indices_includes to filter exported indices #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ BREAKING CHANGES:
The flag `--es.snapshots` has been renamed to `--collector.snapshots`.

* [CHANGE] Rename --es.snapshots to --collector.snapshots #XXX
* [FEATURE] Add option --es.indices_includes to select exported indices #799

## 1.6.0 / 2023-06-22

Expand Down
22 changes: 14 additions & 8 deletions collector/data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,21 @@ var (

// DataStream Information Struct
type DataStream struct {
logger log.Logger
client *http.Client
url *url.URL
logger log.Logger
client *http.Client
url *url.URL
dataStreamIncludes string

dataStreamMetrics []*dataStreamMetric
}

// NewDataStream defines DataStream Prometheus metrics
func NewDataStream(logger log.Logger, client *http.Client, url *url.URL) *DataStream {
func NewDataStream(logger log.Logger, client *http.Client, url *url.URL, dataStreamIncludes string) *DataStream {
return &DataStream{
logger: logger,
client: client,
url: url,
logger: logger,
client: client,
url: url,
dataStreamIncludes: dataStreamIncludes,

dataStreamMetrics: []*dataStreamMetric{
{
Expand Down Expand Up @@ -96,7 +98,11 @@ func (ds *DataStream) fetchAndDecodeDataStreamStats() (DataStreamStatsResponse,
var dsr DataStreamStatsResponse

u := *ds.url
u.Path = path.Join(u.Path, "/_data_stream/*/_stats")
if len(ds.dataStreamIncludes) == 0 {
u.Path = path.Join(u.Path, "/_data_stream/*/_stats")
} else {
u.Path = path.Join(u.Path, "_data_stream", ds.dataStreamIncludes, "_stats")
}
res, err := ds.client.Get(u.String())
if err != nil {
return dsr, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s",
Expand Down
2 changes: 1 addition & 1 deletion collector/data_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDataStream(t *testing.T) {
t.Fatal(err)
}

c := NewDataStream(log.NewNopLogger(), http.DefaultClient, u)
c := NewDataStream(log.NewNopLogger(), http.DefaultClient, u, "")
if err != nil {
t.Fatal(err)
}
Expand Down
32 changes: 20 additions & 12 deletions collector/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ package collector
import (
"encoding/json"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
"github.com/prometheus/client_golang/prometheus"
"io"
"net/http"
"net/url"
"path"
"sort"
"strconv"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus-community/elasticsearch_exporter/pkg/clusterinfo"
"github.com/prometheus/client_golang/prometheus"
)

type labels struct {
Expand Down Expand Up @@ -61,6 +62,7 @@ type Indices struct {
url *url.URL
shards bool
aliases bool
indicesIncludes string
clusterInfoCh chan *clusterinfo.Response
lastClusterInfo *clusterinfo.Response

Expand All @@ -74,7 +76,7 @@ type Indices struct {
}

// NewIndices defines Indices Prometheus metrics
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool, includeAliases bool) *Indices {
func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards bool, includeAliases bool, indicesIncludes string) *Indices {

indexLabels := labels{
keys: func(...string) []string {
Expand Down Expand Up @@ -119,12 +121,13 @@ func NewIndices(logger log.Logger, client *http.Client, url *url.URL, shards boo
}

indices := &Indices{
logger: logger,
client: client,
url: url,
shards: shards,
aliases: includeAliases,
clusterInfoCh: make(chan *clusterinfo.Response),
logger: logger,
client: client,
url: url,
shards: shards,
aliases: includeAliases,
indicesIncludes: indicesIncludes,
clusterInfoCh: make(chan *clusterinfo.Response),
lastClusterInfo: &clusterinfo.Response{
ClusterName: "unknown_cluster",
},
Expand Down Expand Up @@ -1102,7 +1105,12 @@ func (i *Indices) fetchAndDecodeIndexStats() (indexStatsResponse, error) {
var isr indexStatsResponse

u := *i.url
u.Path = path.Join(u.Path, "/_all/_stats")
if len(i.indicesIncludes) == 0 {
u.Path = path.Join(u.Path, "/_all/_stats")
} else {
u.Path = path.Join(u.Path, i.indicesIncludes, "_stats")
}

if i.shards {
u.RawQuery = "ignore_unavailable=true&level=shards"
} else {
Expand Down
22 changes: 14 additions & 8 deletions collector/indices_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type indicesMappingsMetric struct {

// IndicesMappings information struct
type IndicesMappings struct {
logger log.Logger
client *http.Client
url *url.URL
logger log.Logger
client *http.Client
url *url.URL
indicesIncludes string

up prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter
Expand All @@ -49,13 +50,14 @@ type IndicesMappings struct {
}

// NewIndicesMappings defines Indices IndexMappings Prometheus metrics
func NewIndicesMappings(logger log.Logger, client *http.Client, url *url.URL) *IndicesMappings {
func NewIndicesMappings(logger log.Logger, client *http.Client, url *url.URL, indicesIncludes string) *IndicesMappings {
subsystem := "indices_mappings_stats"

return &IndicesMappings{
logger: logger,
client: client,
url: url,
logger: logger,
client: client,
url: url,
indicesIncludes: indicesIncludes,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, subsystem, "up"),
Expand Down Expand Up @@ -157,7 +159,11 @@ func (im *IndicesMappings) getAndParseURL(u *url.URL) (*IndicesMappingsResponse,

func (im *IndicesMappings) fetchAndDecodeIndicesMappings() (*IndicesMappingsResponse, error) {
u := *im.url
u.Path = path.Join(u.Path, "/_all/_mappings")
if len(im.indicesIncludes) == 0 {
u.Path = path.Join(u.Path, "/_all/_mappings")
} else {
u.Path = path.Join(u.Path, im.indicesIncludes, "_mappings")
}
return im.getAndParseURL(&u)
}

Expand Down
4 changes: 2 additions & 2 deletions collector/indices_mappings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestMapping(t *testing.T) {
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
c := NewIndicesMappings(log.NewNopLogger(), http.DefaultClient, u)
c := NewIndicesMappings(log.NewNopLogger(), http.DefaultClient, u, "")
imr, err := c.fetchAndDecodeIndicesMappings()
if err != nil {
t.Fatalf("Failed to fetch or decode indices mappings: %s", err)
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestIndexMappingFieldCount(t *testing.T) {
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
c := NewIndicesMappings(log.NewNopLogger(), http.DefaultClient, u)
c := NewIndicesMappings(log.NewNopLogger(), http.DefaultClient, u, "")
indicesMappingsResponse, err := c.fetchAndDecodeIndicesMappings()
if err != nil {
t.Fatalf("Failed to fetch or decode indices mappings: %s", err)
Expand Down
22 changes: 14 additions & 8 deletions collector/indices_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (

// IndicesSettings information struct
type IndicesSettings struct {
logger log.Logger
client *http.Client
url *url.URL
logger log.Logger
client *http.Client
url *url.URL
indicesIncludes string

up prometheus.Gauge
readOnlyIndices prometheus.Gauge
Expand All @@ -52,11 +53,12 @@ type indicesSettingsMetric struct {
}

// NewIndicesSettings defines Indices Settings Prometheus metrics
func NewIndicesSettings(logger log.Logger, client *http.Client, url *url.URL) *IndicesSettings {
func NewIndicesSettings(logger log.Logger, client *http.Client, url *url.URL, indicesIncludes string) *IndicesSettings {
return &IndicesSettings{
logger: logger,
client: client,
url: url,
logger: logger,
client: client,
url: url,
indicesIncludes: indicesIncludes,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "indices_settings_stats", "up"),
Expand Down Expand Up @@ -154,7 +156,11 @@ func (cs *IndicesSettings) getAndParseURL(u *url.URL, data interface{}) error {
func (cs *IndicesSettings) fetchAndDecodeIndicesSettings() (IndicesSettingsResponse, error) {

u := *cs.url
u.Path = path.Join(u.Path, "/_all/_settings")
if len(cs.indicesIncludes) == 0 {
u.Path = path.Join(u.Path, "/_all/_settings")
} else {
u.Path = path.Join(u.Path, cs.indicesIncludes, "_settings")
}
var asr IndicesSettingsResponse
err := cs.getAndParseURL(&u, &asr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion collector/indices_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestIndicesSettings(t *testing.T) {
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
c := NewIndicesSettings(log.NewNopLogger(), http.DefaultClient, u)
c := NewIndicesSettings(log.NewNopLogger(), http.DefaultClient, u, "")
nsr, err := c.fetchAndDecodeIndicesSettings()
if err != nil {
t.Fatalf("Failed to fetch or decode indices settings: %s", err)
Expand Down
65 changes: 63 additions & 2 deletions collector/indices_test.go

Large diffs are not rendered by default.

22 changes: 14 additions & 8 deletions collector/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ type ShardResponse struct {

// Shards information struct
type Shards struct {
logger log.Logger
client *http.Client
url *url.URL
logger log.Logger
client *http.Client
url *url.URL
indicesIncludes string

nodeShardMetrics []*nodeShardMetric
jsonParseFailures prometheus.Counter
Expand All @@ -59,11 +60,12 @@ type nodeShardMetric struct {
}

// NewShards defines Shards Prometheus metrics
func NewShards(logger log.Logger, client *http.Client, url *url.URL) *Shards {
func NewShards(logger log.Logger, client *http.Client, url *url.URL, indicesIncludes string) *Shards {
return &Shards{
logger: logger,
client: client,
url: url,
logger: logger,
client: client,
url: url,
indicesIncludes: indicesIncludes,

nodeShardMetrics: []*nodeShardMetric{
{
Expand Down Expand Up @@ -126,7 +128,11 @@ func (s *Shards) getAndParseURL(u *url.URL) ([]ShardResponse, error) {
func (s *Shards) fetchAndDecodeShards() ([]ShardResponse, error) {

u := *s.url
u.Path = path.Join(u.Path, "/_cat/shards")
if len(s.indicesIncludes) == 0 {
u.Path = path.Join(u.Path, "/_cat/shards")
} else {
u.Path = path.Join(u.Path, "/_cat/shards", s.indicesIncludes)
}
q := u.Query()
q.Set("format", "json")
u.RawQuery = q.Encode()
Expand Down
13 changes: 8 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func main() {
esExportIndices = kingpin.Flag("es.indices",
"Export stats for indices in the cluster.").
Default("false").Bool()
esExportIndicesIncludes = kingpin.Flag("es.indices_includes",
"Comma-separated list of data streams, indices, and aliases used to select indices to export (default to all).").
Default("").String()
esExportIndicesSettings = kingpin.Flag("es.indices_settings",
"Export stats for settings of all indices of the cluster.").
Default("false").Bool()
Expand Down Expand Up @@ -199,8 +202,8 @@ func main() {
prometheus.MustRegister(collector.NewNodes(logger, httpClient, esURL, *esAllNodes, *esNode))

if *esExportIndices || *esExportShards {
prometheus.MustRegister(collector.NewShards(logger, httpClient, esURL))
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases)
prometheus.MustRegister(collector.NewShards(logger, httpClient, esURL, *esExportIndicesIncludes))
iC := collector.NewIndices(logger, httpClient, esURL, *esExportShards, *esExportIndexAliases, *esExportIndicesIncludes)
prometheus.MustRegister(iC)
if registerErr := clusterInfoRetriever.RegisterConsumer(iC); registerErr != nil {
level.Error(logger).Log("msg", "failed to register indices collector in cluster info")
Expand All @@ -213,15 +216,15 @@ func main() {
}

if *esExportDataStream {
prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL, *esExportIndicesIncludes))
}

if *esExportIndicesSettings {
prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL, *esExportIndicesIncludes))
}

if *esExportIndicesMappings {
prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL))
prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL, *esExportIndicesIncludes))
}

if *esExportILM {
Expand Down