Skip to content

Restart stale watch threads where they left off #275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 23, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def do_work_streaming_refresh
full_refresh
start_watch_threads
else
ensure_watch_threads
targeted_refresh
end
end
Expand Down Expand Up @@ -88,12 +89,22 @@ def start_watch_threads
_log.info("#{log_header} Starting watch threads...")

entity_types.each do |entity_type|
watch_threads[entity_type] = Thread.new { watch_thread(entity_type) }
watch_threads[entity_type] = start_watch_thread(entity_type)
end

_log.info("#{log_header} Starting watch threads...Complete")
end

def ensure_watch_threads
entity_types.each do |entity_type|
next if watch_threads[entity_type].alive?

_log.info("#{log_header} Restarting #{entity_type} watch thread")

watch_threads[entity_type] = start_watch_thread(entity_type)
end
end

def stop_watch_threads
safe_log("#{log_header} Stopping watch threads...")

Expand All @@ -103,14 +114,26 @@ def stop_watch_threads
safe_log("#{log_header} Stopping watch threads...Complete")
end

def start_watch_thread(entity_type)
Thread.new { watch_thread(entity_type) }
end

def watch_thread(entity_type)
_log.info("#{log_header} #{entity_type} watch thread started")

resource_version = resource_versions[entity_type] || "0"
watch_stream = start_watch(entity_type, resource_version)

until finish.value
watch_stream.each { |notice| queue.push(notice) }
until finished?
watch_stream.each do |notice|
# Update the collection resourceVersion to be the most recent
# object's resourceVersion so that if this watch has to be restarted
# it will pick up where it left off.
resource_version = notice.object.metadata.resourceVersion
resource_versions[entity_type] = resource_version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this updates a shared hash (at distinct keys) from multiple threads, is that fully safe in ruby?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll use a Concurrent::Hash to be safe


queue.push(notice)
end
end

_log.info("#{log_header} #{entity_type} watch thread exiting")
Expand All @@ -123,6 +146,10 @@ def start_watch(entity_type, resource_version = "0")
connection_for_entity(entity_type).send(watch_method, :resource_version => resource_version)
end

def finished?
finish.value
end

def connection_for_entity(_entity_type)
kubernetes_connection
end
Expand Down