Skip to content

Commit 82855bc

Browse files
committed
Restart stale watch threads where they left off
If a watch thread fails for any reason (exception, timeout, etc...) restart it with the last resourceVersion so it picks up where it left off.
1 parent fdc2b86 commit 82855bc

File tree

1 file changed

+39
-9
lines changed

1 file changed

+39
-9
lines changed

app/models/manageiq/providers/kubernetes/container_manager/streaming_refresh_mixin.rb

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def do_work_streaming_refresh
3838
full_refresh
3939
start_watch_threads
4040
else
41+
ensure_watch_threads
4142
targeted_refresh
4243
end
4344
end
@@ -69,12 +70,6 @@ def targeted_refresh
6970
_log.info("Processing #{notices.count} Updates...Complete")
7071
end
7172

72-
def save_resource_versions(inventory)
73-
entity_types.each do |entity_type|
74-
resource_versions[entity_type] = inventory.collector.send(entity_type).resourceVersion
75-
end
76-
end
77-
7873
def refresh(collector, parser, persister)
7974
inventory = ManageIQ::Providers::Kubernetes::Inventory.new(persister, collector, parser)
8075

@@ -84,16 +79,35 @@ def refresh(collector, parser, persister)
8479
inventory
8580
end
8681

82+
def save_resource_versions(inventory)
83+
entity_types.each do |entity_type|
84+
resource_version = inventory.collector.send(entity_type).resourceVersion
85+
next if resource_version.nil?
86+
87+
resource_versions[entity_type] = resource_version
88+
end
89+
end
90+
8791
def start_watch_threads
8892
_log.info("#{log_header} Starting watch threads...")
8993

9094
entity_types.each do |entity_type|
91-
watch_threads[entity_type] = Thread.new { watch_thread(entity_type) }
95+
watch_threads[entity_type] = start_watch_thread(entity_type)
9296
end
9397

9498
_log.info("#{log_header} Starting watch threads...Complete")
9599
end
96100

101+
def ensure_watch_threads
102+
entity_types.each do |entity_type|
103+
next if watch_threads[entity_type].alive?
104+
105+
_log.info("#{log_header} Restarting #{entity_type} watch thread")
106+
107+
watch_threads[entity_type] = start_watch_thread(entity_type)
108+
end
109+
end
110+
97111
def stop_watch_threads
98112
safe_log("#{log_header} Stopping watch threads...")
99113

@@ -103,14 +117,26 @@ def stop_watch_threads
103117
safe_log("#{log_header} Stopping watch threads...Complete")
104118
end
105119

120+
def start_watch_thread(entity_type)
121+
Thread.new { watch_thread(entity_type) }
122+
end
123+
106124
def watch_thread(entity_type)
107125
_log.info("#{log_header} #{entity_type} watch thread started")
108126

109127
resource_version = resource_versions[entity_type] || "0"
110128
watch_stream = start_watch(entity_type, resource_version)
111129

112-
until finish.value
113-
watch_stream.each { |notice| queue.push(notice) }
130+
until finished?
131+
watch_stream.each do |notice|
132+
# Update the collection resourceVersion to be the most recent
133+
# object's resourceVersion so that if this watch has to be restarted
134+
# it will pick up where it left off.
135+
resource_version = notice.object.metadata.resourceVersion
136+
resource_versions[entity_type] = resource_version
137+
138+
queue.push(notice)
139+
end
114140
end
115141

116142
_log.info("#{log_header} #{entity_type} watch thread exiting")
@@ -123,6 +149,10 @@ def start_watch(entity_type, resource_version = "0")
123149
connection_for_entity(entity_type).send(watch_method, :resource_version => resource_version)
124150
end
125151

152+
def finished?
153+
finish.value
154+
end
155+
126156
def connection_for_entity(_entity_type)
127157
kubernetes_connection
128158
end

0 commit comments

Comments
 (0)