Skip to content

Commit 9ba5ea3

Browse files
committed
Feat: scrape hot threads CPU consumption percentage
Signed-off-by: avinash kumar <[email protected]>
1 parent edd28cd commit 9ba5ea3

File tree

5 files changed

+276
-0
lines changed

5 files changed

+276
-0
lines changed

collector/hot_threads.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
// Copyright 2021 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
import (
16+
"fmt"
17+
"github.com/go-kit/kit/log"
18+
"github.com/go-kit/kit/log/level"
19+
"github.com/prometheus/client_golang/prometheus"
20+
"io/ioutil"
21+
"net/http"
22+
"net/url"
23+
"os"
24+
"path"
25+
"regexp"
26+
"strconv"
27+
"strings"
28+
)
29+
30+
var (
31+
defaultHotThreadsLabels = []string{"node", "thread_name", "thread_id"}
32+
33+
defaultHotThreadsLabelValues = func(HotThreads string) []string {
34+
return []string{
35+
HotThreads,
36+
}
37+
}
38+
NODE_OUTPUT_SEPERATOR = ":::"
39+
HOT_THREADS_OP_REGEX = `^?([0-9]*[.])?[0-9]+%.*`
40+
CPU_PERCENTAGE_REGEX = `^?([0-9]*[.])?[0-9]+%`
41+
)
42+
43+
// HotThreads information struct
44+
type HotThreads struct {
45+
logger log.Logger
46+
url *url.URL
47+
48+
HotThreadsMetrics HotThreadsMetric
49+
HotThreadsFailureMetrics HotThreadsStepFailureMetric
50+
51+
jsonParseFailures prometheus.Counter
52+
}
53+
54+
type HotThreadsMetric struct {
55+
Type prometheus.ValueType
56+
Desc *prometheus.Desc
57+
Value func(HotThreadsExp float64) float64
58+
Labels func(HotThreadsDataNode string, HotThreadsName, HotThreadsId string) []string
59+
}
60+
61+
type HotThreadsStepFailureMetric struct {
62+
Type prometheus.ValueType
63+
Desc *prometheus.Desc
64+
Value func(HotThreadsExp int64) float64
65+
Labels func(HotThreadsIndex string, HotThreadsPolicy string, action string, step string) []string
66+
}
67+
68+
func getEnv(key, defaultVal string) string {
69+
value, exists := os.LookupEnv(key)
70+
if !exists {
71+
value = defaultVal
72+
}
73+
return value
74+
}
75+
76+
// NewHotThreadsExplain defines HotThreads Prometheus metrics
77+
func NewHotThreads(logger log.Logger, url *url.URL) *HotThreads {
78+
return &HotThreads{
79+
logger: logger,
80+
url: url,
81+
82+
HotThreadsMetrics: HotThreadsMetric{
83+
Type: prometheus.GaugeValue,
84+
Desc: prometheus.NewDesc(
85+
prometheus.BuildFQName(namespace, "hot_threads", "cpu_usage_percentage"),
86+
"Hot Threads cpu usage on data nodes",
87+
defaultHotThreadsLabels, nil,
88+
),
89+
Value: func(HotThreadsCpuPercentage float64) float64 {
90+
return float64(HotThreadsCpuPercentage)
91+
},
92+
Labels: func(HotThreadsDataNode string, HotThreadsName, HotThreadsId string) []string {
93+
return []string{HotThreadsDataNode, HotThreadsName, HotThreadsId}
94+
},
95+
},
96+
97+
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
98+
Name: prometheus.BuildFQName(namespace, "hot_threads", "json_parse_failures"),
99+
Help: "Number of errors while parsing JSON.",
100+
}),
101+
}
102+
}
103+
104+
// Describe HotThreads
105+
func (s *HotThreads) Describe(ch chan<- *prometheus.Desc) {
106+
ch <- s.jsonParseFailures.Desc()
107+
ch <- s.HotThreadsMetrics.Desc
108+
}
109+
110+
func (s *HotThreads) getAndParseURL(u *url.URL, hotThreads *[]HotThreadsRsp) error {
111+
res, err := http.Get(u.String())
112+
if err != nil {
113+
return fmt.Errorf("failed to get from %s://%s:%s%s: %s",
114+
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
115+
}
116+
117+
body, err := ioutil.ReadAll(res.Body)
118+
if err != nil {
119+
_ = level.Warn(s.logger).Log(
120+
"msg", "failed to get resp body",
121+
"err", err,
122+
)
123+
}
124+
125+
defer func() {
126+
err = res.Body.Close()
127+
if err != nil {
128+
_ = level.Warn(s.logger).Log(
129+
"msg", "failed to close http.Client",
130+
"err", err,
131+
)
132+
}
133+
}()
134+
135+
if res.StatusCode != http.StatusOK {
136+
return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
137+
}
138+
139+
sb := string(body)
140+
hotThreadsNodeOp := strings.Split(string(sb), NODE_OUTPUT_SEPERATOR)
141+
142+
for _, nodeData := range hotThreadsNodeOp {
143+
nodeName := strings.Trim(strings.Split(nodeData, "}")[0], " {")
144+
145+
hotThreadsOpRegex := regexp.MustCompile(HOT_THREADS_OP_REGEX)
146+
allHotThreads := hotThreadsOpRegex.FindAllString(nodeData, -1)
147+
cpuPercentageRegex := regexp.MustCompile(CPU_PERCENTAGE_REGEX)
148+
149+
for _, v := range allHotThreads {
150+
cpu := string(cpuPercentageRegex.FindString(v))
151+
cpu = strings.Trim(cpu, "%")
152+
threadName := ""
153+
threadId := ""
154+
data := strings.Split(v, "usage by thread")
155+
if len(data) > 1 {
156+
// longThreadName would be one of these string patterns -
157+
// "process reaper"
158+
// "elasticsearch[keepAlive/7.0.1]"
159+
// "elasticsearch[dragoneye-es-managed-data-6][refresh][T#3]"
160+
// "elasticsearch[elasticsearch-data-0][[geonames][0]: Lucene Merge Thread #12]"
161+
162+
longThreadName := data[1]
163+
threadName = longThreadName
164+
threadId = ""
165+
// does not contain "[]" or ":" with exception of elasticsearch[keepAlive/7.0.1]
166+
if strings.Contains(longThreadName, "[") || strings.Contains(longThreadName, ":") {
167+
if strings.Contains(longThreadName, "keepAlive") {
168+
threadName = "keepAlive"
169+
threadId = ""
170+
} else {
171+
if strings.Contains(longThreadName, "Lucene Merge Thread") {
172+
// lucene merge thread like - elasticsearch[elasticsearch-data-0][[geonames][0]: Lucene Merge Thread #12]
173+
thread := strings.Trim(strings.Split(longThreadName, ":")[1], "[]'")
174+
threadName = "merge"
175+
threadId = strings.Split(thread, "#")[1]
176+
} else {
177+
// search, write, refresh, transport_worker etc. like - elasticsearch[elasticsearch-data-0][write][T#2]
178+
threadName = strings.Trim(strings.Split(longThreadName, "][")[1], "[]'")
179+
threadId = strings.Trim((strings.Split(longThreadName, "][")[2]), "T#[]'")
180+
}
181+
}
182+
}
183+
}
184+
cpuPercentage := 0.0
185+
cpuPercentage, err := strconv.ParseFloat(cpu, 64)
186+
if err != nil {
187+
_ = level.Warn(s.logger).Log(
188+
"msg", "error parsing cpu percentage",
189+
"info", err,
190+
)
191+
}
192+
t := &HotThreadsRsp{CpuPercentage: cpuPercentage, Node: nodeName, ThreadName: threadName, ThreadId: threadId}
193+
*hotThreads = append(*hotThreads, *t)
194+
}
195+
}
196+
197+
return nil
198+
}
199+
200+
func (s *HotThreads) fetchAndDecodeHotThreads() ([]HotThreadsRsp, error) {
201+
202+
u := *s.url
203+
u.Path = path.Join(u.Path, "/_nodes/hot_threads")
204+
205+
var MAX_HOT_THREADS_COUNT = getEnv("MAX_HOT_THREADS_COUNT", "3")
206+
var HOT_THREADS_SECOND_SAMPLING_INTERVAL = getEnv("HOT_THREADS_SECOND_SAMPLING_INTERVAL", "500ms")
207+
208+
q := u.Query()
209+
q.Set("threads", MAX_HOT_THREADS_COUNT)
210+
q.Set("interval", HOT_THREADS_SECOND_SAMPLING_INTERVAL)
211+
u.RawQuery = q.Encode()
212+
u.RawPath = q.Encode()
213+
var ifr []HotThreadsRsp
214+
err := s.getAndParseURL(&u, &ifr)
215+
216+
if err != nil {
217+
return ifr, err
218+
}
219+
return ifr, err
220+
}
221+
222+
// Collect gets cluster hot threads metric values
223+
func (s *HotThreads) Collect(ch chan<- prometheus.Metric) {
224+
225+
defer func() {
226+
ch <- s.jsonParseFailures
227+
}()
228+
229+
ir, err := s.fetchAndDecodeHotThreads()
230+
if err != nil {
231+
_ = level.Warn(s.logger).Log(
232+
"msg", "failed to fetch and decode HotThreads stats",
233+
"err", err,
234+
)
235+
return
236+
}
237+
238+
for _, t := range ir {
239+
ch <- prometheus.MustNewConstMetric(
240+
s.HotThreadsMetrics.Desc,
241+
s.HotThreadsMetrics.Type,
242+
s.HotThreadsMetrics.Value(t.CpuPercentage),
243+
s.HotThreadsMetrics.Labels(t.Node, t.ThreadName, t.ThreadId)...,
244+
)
245+
}
246+
}

collector/hot_threads_response.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2021 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
// _nodes/hot_threads response
16+
type HotThreadsRsp struct {
17+
Node string `json:"node"`
18+
ThreadName string `json:"thread_name"`
19+
ThreadId string `json:"thread_id"`
20+
CpuPercentage float64 `json:"cpu_percentage"`
21+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.16
44

55
require (
66
github.com/blang/semver v3.5.2-0.20180723201105-3c1074078d32+incompatible
7+
github.com/go-kit/kit v0.9.0
78
github.com/go-kit/log v0.2.0
89
github.com/imdario/mergo v0.3.12
910
github.com/prometheus/client_golang v1.11.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9
6565
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
6666
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
6767
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
68+
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
6869
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
6970
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
7071
github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw=
@@ -74,6 +75,7 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
7475
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
7576
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
7677
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
78+
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
7779
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
7880
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
7981
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=

main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ func main() {
8282
esExportSnapshots = kingpin.Flag("es.snapshots",
8383
"Export stats for the cluster snapshots.").
8484
Default("false").Bool()
85+
esExportHotThreads = kingpin.Flag("es.hot_threads",
86+
"Export stats for hot threads on data nodes.").
87+
Default("false").Envar("ES_HOT_THREADS").Bool()
8588
esClusterInfoInterval = kingpin.Flag("es.clusterinfo.interval",
8689
"Cluster info update interval for the cluster label").
8790
Default("5m").Duration()
@@ -188,6 +191,9 @@ func main() {
188191
prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL))
189192
}
190193

194+
if *esExportHotThreads {
195+
prometheus.MustRegister(collector.NewHotThreads(logger, esURL))
196+
}
191197
// create a http server
192198
server := &http.Server{}
193199

0 commit comments

Comments
 (0)