5
5
require "socket" # for Socket.gethostname
6
6
require "manticore"
7
7
require "rufus/scheduler"
8
+ require "yaml" # persistence
8
9
9
10
class LogStash ::Inputs ::HTTP_Poller < LogStash ::Inputs ::Base
10
11
include LogStash ::PluginMixins ::HttpClient
@@ -57,10 +58,15 @@ def setup_requests!
57
58
@requests = Hash [ @urls . map { |name , url | [ name , normalize_request ( url ) ] } ]
58
59
end
59
60
61
+ private
62
+ def filter_dynamic_params ( allowed_keys , params )
63
+ params . slice ( *allowed_keys )
64
+ end
65
+
60
66
private
61
67
def normalize_request ( url_or_spec )
62
68
if url_or_spec . is_a? ( String )
63
- res = [ :get , url_or_spec ]
69
+ res = [ :get , url_or_spec , { } ]
64
70
elsif url_or_spec . is_a? ( Hash )
65
71
# The client will expect keys / values
66
72
spec = Hash [ url_or_spec . clone . map { |k , v | [ k . to_sym , v ] } ] # symbolize keys
@@ -77,17 +83,29 @@ def normalize_request(url_or_spec)
77
83
auth = spec [ :auth ]
78
84
user = spec . delete ( :user ) || ( auth && auth [ "user" ] )
79
85
password = spec . delete ( :password ) || ( auth && auth [ "password" ] )
80
-
86
+
81
87
if user . nil? ^ password . nil?
82
88
raise LogStash ::ConfigurationError , "'user' and 'password' must both be specified for input HTTP poller!"
83
89
end
84
90
85
91
if user && password
86
92
spec [ :auth ] = {
87
- user : user ,
93
+ user : user ,
88
94
pass : password ,
89
95
eager : true
90
- }
96
+ }
97
+
98
+ if spec . delete ( :use_dynamic_params )
99
+ last_dynamic_params_location = spec [ :last_dynamic_params ]
100
+ dynamic_params_map = spec [ :dynamic_params_map ]
101
+
102
+ if last_dynamic_params_location . is_a? ( String ) && File . exist? ( last_dynamic_params_location )
103
+ dynamic_params = YAML . load ( File . read ( last_dynamic_params_location ) )
104
+ allowed_keys = dynamic_params_map . is_a? ( Hash ) ? dynamic_params_map . keys : [ ]
105
+ spec [ :dynamic_params ] = filter_dynamic_params ( allowed_keys , dynamic_params )
106
+ else
107
+ spec [ :dynamic_params ] = { }
108
+ end
91
109
end
92
110
res = [ method , url , spec ]
93
111
else
@@ -133,13 +151,23 @@ def setup_schedule(queue)
133
151
134
152
@scheduler = Rufus ::Scheduler . new ( :max_work_threads => 1 )
135
153
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
136
- opts = schedule_type == "every" ? { :first_in => 0.01 } : { }
154
+ opts = schedule_type == "every" ? { :first_in => 0.01 } : { }
137
155
@scheduler . send ( schedule_type , schedule_value , opts ) { run_once ( queue ) }
138
156
@scheduler . join
139
157
end
140
158
159
+ private
160
+ def assign_dynamic_params ( request )
161
+ params = request [ 2 ] [ :dynamic_params ]
162
+ request [ 2 ] [ :query ] = { } if !request [ 2 ] [ :query ]
163
+ params . keys . each do |key |
164
+ request [ 2 ] [ :query ] [ key ] = params [ key ]
165
+ end
166
+ end
167
+
141
168
def run_once ( queue )
142
169
@requests . each do |name , request |
170
+ assign_dynamic_params ( request ) if request [ 2 ] [ :dynamic_params ]
143
171
request_async ( queue , name , request )
144
172
end
145
173
@@ -175,11 +203,23 @@ def handle_success(queue, name, request, response, execution_time)
175
203
end
176
204
end
177
205
206
+ private
207
+ def update_dynamic_params ( request , event )
208
+ request [ 2 ] [ :dynamic_params_map ] . keys . each do |key |
209
+ value = request [ 2 ] [ :dynamic_params_map ] [ key ]
210
+ event_value = event . get ( value )
211
+ request [ 2 ] [ :dynamic_params ] [ key ] = event_value if event_value
212
+ end
213
+ File . write ( request [ 2 ] [ :last_dynamic_params ] , YAML . dump ( request [ 2 ] [ :dynamic_params ] ) )
214
+ end
215
+
178
216
private
179
217
def handle_decoded_event ( queue , name , request , response , event , execution_time )
180
218
apply_metadata ( event , name , request , response , execution_time )
181
219
decorate ( event )
182
220
queue << event
221
+
222
+ update_dynamic_params ( request , event ) if request [ 2 ] [ :dynamic_params ]
183
223
rescue StandardError , java . lang . Exception => e
184
224
@logger . error? && @logger . error ( "Error eventifying response!" ,
185
225
:exception => e ,
0 commit comments