Skip to content

Commit 9b48b09

Browse files
qingling128jcantrill
authored andcommitted
Add retry logic in pod and namespace watches when Kubernetes API connection gets closed. (#204)
* Add retry logic in pod and namespace watches when Kubernetes API connection gets closed. * Refactor and add backup. Change-Id: I6771e12b4fa359b92f7de757b9cf56704693833e * Fix tests. Change-Id: I688a867f6b0a7b88e198b5d5ac5463f02c4022b5 * remove unused variables. Change-Id: I36f2dd8ef34d9bdeb890b0b073a802e6401ace32 * add stats. Change-Id: I5a116f5a91a9251de849a2d679ec277af6272a52 * Add tests for retry. Change-Id: I7fb939bc5dbb3851ed07c674ec22416a43c92432
1 parent d0bd7e4 commit 9b48b09

7 files changed

+248
-84
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ when true (default: `true`)
6161
* `skip_container_metadata` - Skip some of the container data of the metadata. The metadata will not contain the container_image and container_image_id fields.
6262
* `skip_master_url` - Skip the master_url field from the metadata.
6363
* `skip_namespace_metadata` - Skip the namespace_id field from the metadata. The fetch_namespace_metadata function will be skipped. The plugin will be faster and cpu consumption will be less.
64+
* `watch_retry_interval` - The time interval in seconds for retry backoffs when watch connections fail. (default: `10`)
6465

6566
**NOTE:** As of the release 2.1.x of this plugin, it no longer supports parsing the source message into JSON and attaching it to the
6667
payload. The following configuration options are removed:

lib/fluent/plugin/filter_kubernetes_metadata.rb

+18-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
require_relative 'kubernetes_metadata_stats'
2323
require_relative 'kubernetes_metadata_watch_namespaces'
2424
require_relative 'kubernetes_metadata_watch_pods'
25+
26+
require 'fluent/plugin_helper/thread'
2527
require 'fluent/plugin/filter'
2628
require 'resolv'
2729

@@ -37,6 +39,8 @@ class KubernetesMetadataFilter < Fluent::Plugin::Filter
3739

3840
Fluent::Plugin.register_filter('kubernetes_metadata', self)
3941

42+
helpers :thread
43+
4044
config_param :kubernetes_url, :string, default: nil
4145
config_param :cache_size, :integer, default: 1000
4246
config_param :cache_ttl, :integer, default: 60 * 60
@@ -80,6 +84,12 @@ class KubernetesMetadataFilter < Fluent::Plugin::Filter
8084
config_param :skip_container_metadata, :bool, default: false
8185
config_param :skip_master_url, :bool, default: false
8286
config_param :skip_namespace_metadata, :bool, default: false
87+
# The time interval in seconds for retry backoffs when watch connections fail.
88+
config_param :watch_retry_interval, :bool, default: 1
89+
# The base number of exponential backoff for retries.
90+
config_param :watch_retry_exponential_backoff_base, :bool, default: 2
91+
# The maximum number of times to retry pod and namespace watches.
92+
config_param :watch_retry_max_times, :bool, default: 10
8393

8494
def fetch_pod_metadata(namespace_name, pod_name)
8595
log.trace("fetching pod metadata: #{namespace_name}/#{pod_name}") if log.trace?
@@ -264,9 +274,14 @@ def log.trace?
264274
end
265275

266276
if @watch
267-
thread = Thread.new(self) { |this| this.start_pod_watch }
268-
thread.abort_on_exception = true
269-
namespace_thread = Thread.new(self) { |this| this.start_namespace_watch }
277+
pod_thread = thread_create :"pod_watch_thread" do
278+
set_up_pod_thread
279+
end
280+
pod_thread.abort_on_exception = true
281+
282+
namespace_thread = thread_create :"namespace_watch_thread" do
283+
set_up_namespace_thread
284+
end
270285
namespace_thread.abort_on_exception = true
271286
end
272287
end

lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb

+73-19
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,81 @@ module WatchNamespaces
2323

2424
include ::KubernetesMetadata::Common
2525

26-
def start_namespace_watch
27-
begin
28-
options = {
29-
resource_version: '0' # Fetch from API server.
30-
}
31-
namespaces = @client.get_namespaces(options)
32-
namespaces.each do |namespace|
33-
cache_key = namespace.metadata['uid']
34-
@namespace_cache[cache_key] = parse_namespace_metadata(namespace)
35-
@stats.bump(:namespace_cache_host_updates)
26+
def set_up_namespace_thread
27+
# Any failures / exceptions in the initial setup should raise
28+
# Fluent:ConfigError, so that users can inspect potential errors in
29+
# the configuration.
30+
namespace_watcher = start_namespace_watch
31+
Thread.current[:namespace_watch_retry_backoff_interval] = @watch_retry_interval
32+
Thread.current[:namespace_watch_retry_count] = 0
33+
34+
# Any failures / exceptions in the followup watcher notice
35+
# processing will be swallowed and retried. These failures /
36+
# exceptions could be caused by Kubernetes API being temporarily
37+
# down. We assume the configuration is correct at this point.
38+
while thread_current_running?
39+
begin
40+
namespace_watcher ||= get_namespaces_and_start_watcher
41+
process_namespace_watcher_notices(namespace_watcher)
42+
rescue Exception => e
43+
@stats.bump(:namespace_watch_failures)
44+
if Thread.current[:namespace_watch_retry_count] < @watch_retry_max_times
45+
# Instead of raising exceptions and crashing Fluentd, swallow
46+
# the exception and reset the watcher.
47+
log.info(
48+
"Exception encountered parsing namespace watch event. " \
49+
"The connection might have been closed. Sleeping for " \
50+
"#{Thread.current[:namespace_watch_retry_backoff_interval]} " \
51+
"seconds and resetting the namespace watcher.", e)
52+
sleep(Thread.current[:namespace_watch_retry_backoff_interval])
53+
Thread.current[:namespace_watch_retry_count] += 1
54+
Thread.current[:namespace_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
55+
namespace_watcher = nil
56+
else
57+
# Since retries failed for many times, log as errors instead
58+
# of info and raise exceptions and trigger Fluentd to restart.
59+
message =
60+
"Exception encountered parsing namespace watch event. The " \
61+
"connection might have been closed. Retried " \
62+
"#{@watch_retry_max_times} times yet still failing. Restarting."
63+
log.error(message, e)
64+
raise Fluent::UnrecoverableError.new(message)
65+
end
3666
end
37-
options[:resource_version] = namespaces.resourceVersion
38-
watcher = @client.watch_namespaces(options)
39-
rescue Exception=>e
40-
message = "start_namespace_watch: Exception encountered setting up namespace watch from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{e.message}"
41-
message += " (#{e.response})" if e.respond_to?(:response)
42-
log.debug(message)
67+
end
68+
end
69+
70+
def start_namespace_watch
71+
return get_namespaces_and_start_watcher
72+
rescue Exception => e
73+
message = "start_namespace_watch: Exception encountered setting up " \
74+
"namespace watch from Kubernetes API #{@apiVersion} endpoint " \
75+
"#{@kubernetes_url}: #{e.message}"
76+
message += " (#{e.response})" if e.respond_to?(:response)
77+
log.debug(message)
78+
79+
raise Fluent::ConfigError, message
80+
end
4381

44-
raise Fluent::ConfigError, message
82+
# List all namespaces, record the resourceVersion and return a watcher
83+
# starting from that resourceVersion.
84+
def get_namespaces_and_start_watcher
85+
options = {
86+
resource_version: '0' # Fetch from API server.
87+
}
88+
namespaces = @client.get_namespaces(options)
89+
namespaces.each do |namespace|
90+
cache_key = namespace.metadata['uid']
91+
@namespace_cache[cache_key] = parse_namespace_metadata(namespace)
92+
@stats.bump(:namespace_cache_host_updates)
4593
end
94+
options[:resource_version] = namespaces.resourceVersion
95+
watcher = @client.watch_namespaces(options)
96+
watcher
97+
end
98+
99+
# Process a watcher notice and potentially raise an exception.
100+
def process_namespace_watcher_notices(watcher)
46101
watcher.each do |notice|
47102
case notice.type
48103
when 'MODIFIED'
@@ -55,7 +110,7 @@ def start_namespace_watch
55110
@stats.bump(:namespace_cache_watch_misses)
56111
end
57112
when 'DELETED'
58-
# ignore and let age out for cases where
113+
# ignore and let age out for cases where
59114
# deleted but still processing logs
60115
@stats.bump(:namespace_cache_watch_deletes_ignored)
61116
else
@@ -65,6 +120,5 @@ def start_namespace_watch
65120
end
66121
end
67122
end
68-
69123
end
70124
end

lib/fluent/plugin/kubernetes_metadata_watch_pods.rb

+74-20
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,84 @@ module WatchPods
2323

2424
include ::KubernetesMetadata::Common
2525

26-
def start_pod_watch
27-
begin
28-
options = {
29-
resource_version: '0' # Fetch from API server.
30-
}
31-
if ENV['K8S_NODE_NAME']
32-
options[:field_selector] = 'spec.nodeName=' + ENV['K8S_NODE_NAME']
33-
end
34-
pods = @client.get_pods(options)
35-
pods.each do |pod|
36-
cache_key = pod.metadata['uid']
37-
@cache[cache_key] = parse_pod_metadata(pod)
38-
@stats.bump(:pod_cache_host_updates)
26+
def set_up_pod_thread
27+
# Any failures / exceptions in the initial setup should raise
28+
# Fluent:ConfigError, so that users can inspect potential errors in
29+
# the configuration.
30+
pod_watcher = start_pod_watch
31+
Thread.current[:pod_watch_retry_backoff_interval] = @watch_retry_interval
32+
Thread.current[:pod_watch_retry_count] = 0
33+
34+
# Any failures / exceptions in the followup watcher notice
35+
# processing will be swallowed and retried. These failures /
36+
# exceptions could be caused by Kubernetes API being temporarily
37+
# down. We assume the configuration is correct at this point.
38+
while thread_current_running?
39+
begin
40+
pod_watcher ||= get_pods_and_start_watcher
41+
process_pod_watcher_notices(pod_watcher)
42+
rescue Exception => e
43+
@stats.bump(:pod_watch_failures)
44+
if Thread.current[:pod_watch_retry_count] < @watch_retry_max_times
45+
# Instead of raising exceptions and crashing Fluentd, swallow
46+
# the exception and reset the watcher.
47+
log.info(
48+
"Exception encountered parsing pod watch event. The " \
49+
"connection might have been closed. Sleeping for " \
50+
"#{Thread.current[:pod_watch_retry_backoff_interval]} " \
51+
"seconds and resetting the pod watcher.", e)
52+
sleep(Thread.current[:pod_watch_retry_backoff_interval])
53+
Thread.current[:pod_watch_retry_count] += 1
54+
Thread.current[:pod_watch_retry_backoff_interval] *= @watch_retry_exponential_backoff_base
55+
pod_watcher = nil
56+
else
57+
# Since retries failed for many times, log as errors instead
58+
# of info and raise exceptions and trigger Fluentd to restart.
59+
message =
60+
"Exception encountered parsing pod watch event. The " \
61+
"connection might have been closed. Retried " \
62+
"#{@watch_retry_max_times} times yet still failing. Restarting."
63+
log.error(message, e)
64+
raise Fluent::UnrecoverableError.new(message)
65+
end
3966
end
40-
options[:resource_version] = pods.resourceVersion
41-
watcher = @client.watch_pods(options)
42-
rescue Exception => e
43-
message = "start_pod_watch: Exception encountered setting up pod watch from Kubernetes API #{@apiVersion} endpoint #{@kubernetes_url}: #{e.message}"
44-
message += " (#{e.response})" if e.respond_to?(:response)
45-
log.debug(message)
67+
end
68+
end
4669

47-
raise Fluent::ConfigError, message
70+
def start_pod_watch
71+
get_pods_and_start_watcher
72+
rescue Exception => e
73+
message = "start_pod_watch: Exception encountered setting up pod watch " \
74+
"from Kubernetes API #{@apiVersion} endpoint " \
75+
"#{@kubernetes_url}: #{e.message}"
76+
message += " (#{e.response})" if e.respond_to?(:response)
77+
log.debug(message)
78+
79+
raise Fluent::ConfigError, message
80+
end
81+
82+
# List all pods, record the resourceVersion and return a watcher starting
83+
# from that resourceVersion.
84+
def get_pods_and_start_watcher
85+
options = {
86+
resource_version: '0' # Fetch from API server.
87+
}
88+
if ENV['K8S_NODE_NAME']
89+
options[:field_selector] = 'spec.nodeName=' + ENV['K8S_NODE_NAME']
90+
end
91+
pods = @client.get_pods(options)
92+
pods.each do |pod|
93+
cache_key = pod.metadata['uid']
94+
@cache[cache_key] = parse_pod_metadata(pod)
95+
@stats.bump(:pod_cache_host_updates)
4896
end
97+
options[:resource_version] = pods.resourceVersion
98+
watcher = @client.watch_pods(options)
99+
watcher
100+
end
49101

102+
# Process a watcher notice and potentially raise an exception.
103+
def process_pod_watcher_notices(watcher)
50104
watcher.each do |notice|
51105
case notice.type
52106
when 'MODIFIED'

test/plugin/test_watch_namespaces.rb

+18-6
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class WatchNamespacesTestTest < WatchTest
7474

7575
test 'namespace list caches namespaces' do
7676
@client.stub :get_namespaces, @initial do
77-
start_namespace_watch
77+
process_namespace_watcher_notices(start_namespace_watch)
7878
assert_equal(true, @namespace_cache.key?('initial_uid'))
7979
assert_equal(true, @namespace_cache.key?('modified_uid'))
8080
assert_equal(2, @stats[:namespace_cache_host_updates])
@@ -86,7 +86,7 @@ class WatchNamespacesTestTest < WatchTest
8686
ENV['K8S_NODE_NAME'] = 'aNodeName'
8787
@client.stub :get_namespaces, @initial do
8888
@client.stub :watch_namespaces, [@modified] do
89-
start_namespace_watch
89+
process_namespace_watcher_notices(start_namespace_watch)
9090
assert_equal(2, @stats[:namespace_cache_host_updates])
9191
assert_equal(1, @stats[:namespace_cache_watch_updates])
9292
end
@@ -96,15 +96,15 @@ class WatchNamespacesTestTest < WatchTest
9696

9797
test 'namespace watch ignores CREATED' do
9898
@client.stub :watch_namespaces, [@created] do
99-
start_namespace_watch
99+
process_namespace_watcher_notices(start_namespace_watch)
100100
assert_equal(false, @namespace_cache.key?('created_uid'))
101101
assert_equal(1, @stats[:namespace_cache_watch_ignored])
102102
end
103103
end
104104

105105
test 'namespace watch ignores MODIFIED when info not in cache' do
106106
@client.stub :watch_namespaces, [@modified] do
107-
start_namespace_watch
107+
process_namespace_watcher_notices(start_namespace_watch)
108108
assert_equal(false, @namespace_cache.key?('modified_uid'))
109109
assert_equal(1, @stats[:namespace_cache_watch_misses])
110110
end
@@ -113,7 +113,7 @@ class WatchNamespacesTestTest < WatchTest
113113
test 'namespace watch updates cache when MODIFIED is received and info is cached' do
114114
@namespace_cache['modified_uid'] = {}
115115
@client.stub :watch_namespaces, [@modified] do
116-
start_namespace_watch
116+
process_namespace_watcher_notices(start_namespace_watch)
117117
assert_equal(true, @namespace_cache.key?('modified_uid'))
118118
assert_equal(1, @stats[:namespace_cache_watch_updates])
119119
end
@@ -122,10 +122,22 @@ class WatchNamespacesTestTest < WatchTest
122122
test 'namespace watch ignores DELETED' do
123123
@namespace_cache['deleted_uid'] = {}
124124
@client.stub :watch_namespaces, [@deleted] do
125-
start_namespace_watch
125+
process_namespace_watcher_notices(start_namespace_watch)
126126
assert_equal(true, @namespace_cache.key?('deleted_uid'))
127127
assert_equal(1, @stats[:namespace_cache_watch_deletes_ignored])
128128
end
129129
end
130130

131+
test 'namespace watch retries when exceptions are encountered' do
132+
@client.stub :get_namespaces, @initial do
133+
@client.stub :watch_namespaces, [[@created, @exception_raised]] do
134+
assert_raise Fluent::UnrecoverableError do
135+
set_up_namespace_thread
136+
end
137+
assert_equal(3, @stats[:namespace_watch_failures])
138+
assert_equal(2, Thread.current[:namespace_watch_retry_count])
139+
assert_equal(4, Thread.current[:namespace_watch_retry_backoff_interval])
140+
end
141+
end
142+
end
131143
end

0 commit comments

Comments
 (0)