diff --git a/contrib/ruby_event_store-outbox/.mutant.yml b/contrib/ruby_event_store-outbox/.mutant.yml index 04beb6b1e7..5874618343 100644 --- a/contrib/ruby_event_store-outbox/.mutant.yml +++ b/contrib/ruby_event_store-outbox/.mutant.yml @@ -14,6 +14,7 @@ matcher: - RubyEventStore::Outbox* ignore: - RubyEventStore::Outbox::CLI* + - RubyEventStore::Outbox::SidekiqProducer#initialize - RubyEventStore::Outbox::SidekiqProducer#call - RubyEventStore::Outbox::SidekiqProcessor#process - RubyEventStore::Outbox::SidekiqProcessor#after_batch @@ -33,7 +34,7 @@ matcher: - RubyEventStore::Outbox::Configuration* - RubyEventStore::Outbox::Consumer#get_remaining_count - RubyEventStore::Outbox::CleanupStrategies::None* - - RubyEventStore::Outbox::Repository* + - RubyEventStore::Outbox::Repositories* - RubyEventStore::Outbox::Runner#initialize - RubyEventStore::Outbox::Runner#run - RubyEventStore::Outbox::Runner#prepare_traps diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox.rb index 4c47500cfb..0b5cfbfed9 100644 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox.rb +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox.rb @@ -8,9 +8,10 @@ module Outbox end require_relative "outbox/fetch_specification" -require_relative "outbox/repository" +require_relative "outbox/repositories/mysql57" require_relative "outbox/sidekiq_scheduler" require_relative "outbox/version" require_relative "outbox/tempo" require_relative "outbox/batch_result" require_relative "outbox/cleanup_strategies" +require_relative "outbox/repositories" diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb index 9b7d7b10ec..e5e6c81642 100644 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb @@ -1,7 +1,7 @@ require "logger" require "redis" require "active_record" -require_relative "repository" +require_relative "repositories/mysql57" require_relative "sidekiq5_format" require_relative "sidekiq_processor" require_relative "fetch_specification" @@ -24,7 +24,7 @@ def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:) raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT @processor = SidekiqProcessor.new(Redis.new(url: configuration.redis_url)) - @repository = Repository.new(configuration.database_url) + @repository = Repositories::Mysql57.build_for_consumer(configuration.database_url, clock: clock) @cleanup_strategy = CleanupStrategies.build(configuration, repository) end @@ -123,7 +123,7 @@ def log_error(e) end def obtain_lock_for_process(fetch_specification) - result = repository.obtain_lock_for_process(fetch_specification, consumer_uuid, clock: @clock) + result = repository.obtain_lock_for_process(fetch_specification, consumer_uuid) case result when :deadlocked logger.warn "Obtaining lock for split_key '#{fetch_specification.split_key}' failed (deadlock)" @@ -158,7 +158,7 @@ def release_lock_for_process(fetch_specification) end def refresh_lock_for_process(lock) - result = lock.refresh(clock: @clock) + result = repository.refresh_lock(lock) case result when :ok return true diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories.rb new file mode 100644 index 0000000000..04070f6e61 --- /dev/null +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories.rb @@ -0,0 +1,6 @@ +module RubyEventStore + module Outbox + module Repositories + end + end +end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories/mysql57.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories/mysql57.rb new file mode 100644 index 0000000000..cd49a00af2 --- /dev/null +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories/mysql57.rb @@ -0,0 +1,178 @@ +# frozen_string_literal: true + +require "active_record" +require "active_support/core_ext/numeric/time.rb" + +module RubyEventStore + module Outbox + module Repositories + class Mysql57 + RECENTLY_LOCKED_DURATION = 10.minutes + + class Record < ::ActiveRecord::Base + self.primary_key = :id + self.table_name = "event_store_outbox" + + def self.remaining_for(fetch_specification) + where(format: fetch_specification.message_format, split_key: fetch_specification.split_key, enqueued_at: nil) + end + + def self.for_fetch_specification(fetch_specification) + where(format: fetch_specification.message_format, split_key: fetch_specification.split_key) + end + + def hash_payload + JSON.parse(payload).deep_symbolize_keys + end + + def enqueued? + !enqueued_at.nil? + end + end + + class Lock < ::ActiveRecord::Base + self.table_name = "event_store_outbox_locks" + + def self.obtain(fetch_specification, process_uuid, clock:) + transaction do + l = get_lock_record(fetch_specification) + + if l.recently_locked?(clock: clock) + :taken + else + l.update!(locked_by: process_uuid, locked_at: clock.now) + l + end + end + rescue ::ActiveRecord::Deadlocked + :deadlocked + rescue ::ActiveRecord::LockWaitTimeout + :lock_timeout + end + + def self.refresh(lock, clock:) + transaction do + current_process_uuid = lock.locked_by + lock_record = Lock.lock.find(lock.id) + if lock_record.locked_by == current_process_uuid + lock_record.update!(locked_at: clock.now) + lock.assign_attributes(lock_record.attributes) + :ok + else + :stolen + end + end + rescue ::ActiveRecord::Deadlocked + :deadlocked + rescue ::ActiveRecord::LockWaitTimeout + :lock_timeout + end + + def self.release(fetch_specification, process_uuid) + transaction do + l = get_lock_record(fetch_specification) + if !l.locked_by?(process_uuid) + :not_taken_by_this_process + else + l.update!(locked_by: nil, locked_at: nil) + :ok + end + end + rescue ::ActiveRecord::Deadlocked + :deadlocked + rescue ::ActiveRecord::LockWaitTimeout + :lock_timeout + end + + def locked_by?(process_uuid) + locked_by.eql?(process_uuid) + end + + def recently_locked?(clock:) + locked_by && locked_at > RECENTLY_LOCKED_DURATION.ago(clock.now) + end + + def fetch_specification + FetchSpecification.new(format, split_key) + end + + private + + def self.lock_for_split_key(fetch_specification) + lock.find_by(format: fetch_specification.message_format, split_key: fetch_specification.split_key) + end + + def self.get_lock_record(fetch_specification) + l = lock_for_split_key(fetch_specification) + if l.nil? + begin + l = create!(format: fetch_specification.message_format, split_key: fetch_specification.split_key) + rescue ::ActiveRecord::RecordNotUnique + l = lock_for_split_key(fetch_specification) + end + end + l + end + end + + def self.build_for_consumer(database_url, clock:) + ::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected? + if ::ActiveRecord::Base.connection.adapter_name == "Mysql2" + ::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;") + end + new(clock: clock) + end + + def self.build_for_producer(clock: Time) + new(clock: clock) + end + + def initialize(clock:) + @clock = clock + end + + def insert_record(format, split_key, payload) + Record.create!(format: format, split_key: split_key, payload: payload) + end + + def retrieve_batch(fetch_specification, batch_size) + Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a + end + + def get_remaining_count(fetch_specification) + Record.remaining_for(fetch_specification).count + end + + def obtain_lock_for_process(fetch_specification, process_uuid) + Lock.obtain(fetch_specification, process_uuid, clock: clock) + end + + def release_lock_for_process(fetch_specification, process_uuid) + Lock.release(fetch_specification, process_uuid) + end + + def refresh_lock(lock) + Lock.refresh(lock, clock: clock) + end + + def mark_as_enqueued(record, now) + record.update_column(:enqueued_at, now) + end + + def delete_enqueued_older_than(fetch_specification, duration, limit) + scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago) + scope = scope.limit(limit).order(:id) unless limit == :all + scope.delete_all + :ok + rescue ::ActiveRecord::Deadlocked + :deadlocked + rescue ::ActiveRecord::LockWaitTimeout + :lock_timeout + end + + private + attr_reader :clock + end + end + end +end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repository.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repository.rb deleted file mode 100644 index ad89e6a1a6..0000000000 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repository.rb +++ /dev/null @@ -1,155 +0,0 @@ -# frozen_string_literal: true - -require "active_record" -require "active_support/core_ext/numeric/time.rb" - -module RubyEventStore - module Outbox - class Repository - RECENTLY_LOCKED_DURATION = 10.minutes - - class Record < ::ActiveRecord::Base - self.primary_key = :id - self.table_name = "event_store_outbox" - - def self.remaining_for(fetch_specification) - where(format: fetch_specification.message_format, split_key: fetch_specification.split_key, enqueued_at: nil) - end - - def self.for_fetch_specification(fetch_specification) - where(format: fetch_specification.message_format, split_key: fetch_specification.split_key) - end - - def hash_payload - JSON.parse(payload).deep_symbolize_keys - end - - def enqueued? - !enqueued_at.nil? - end - end - - class Lock < ::ActiveRecord::Base - self.table_name = "event_store_outbox_locks" - - def self.obtain(fetch_specification, process_uuid, clock:) - transaction do - l = get_lock_record(fetch_specification) - - if l.recently_locked?(clock: clock) - :taken - else - l.update!(locked_by: process_uuid, locked_at: clock.now) - l - end - end - rescue ::ActiveRecord::Deadlocked - :deadlocked - rescue ::ActiveRecord::LockWaitTimeout - :lock_timeout - end - - def refresh(clock:) - transaction do - current_process_uuid = locked_by - lock! - if locked_by == current_process_uuid - update!(locked_at: clock.now) - :ok - else - :stolen - end - end - rescue ::ActiveRecord::Deadlocked - :deadlocked - rescue ::ActiveRecord::LockWaitTimeout - :lock_timeout - end - - def self.release(fetch_specification, process_uuid) - transaction do - l = get_lock_record(fetch_specification) - if !l.locked_by?(process_uuid) - :not_taken_by_this_process - else - l.update!(locked_by: nil, locked_at: nil) - :ok - end - end - rescue ::ActiveRecord::Deadlocked - :deadlocked - rescue ::ActiveRecord::LockWaitTimeout - :lock_timeout - end - - def locked_by?(process_uuid) - locked_by.eql?(process_uuid) - end - - def recently_locked?(clock:) - locked_by && locked_at > RECENTLY_LOCKED_DURATION.ago(clock.now) - end - - def fetch_specification - FetchSpecification.new(format, split_key) - end - - private - - def self.lock_for_split_key(fetch_specification) - lock.find_by(format: fetch_specification.message_format, split_key: fetch_specification.split_key) - end - - def self.get_lock_record(fetch_specification) - l = lock_for_split_key(fetch_specification) - if l.nil? - begin - l = create!(format: fetch_specification.message_format, split_key: fetch_specification.split_key) - rescue ::ActiveRecord::RecordNotUnique - l = lock_for_split_key(fetch_specification) - end - end - l - end - end - - def initialize(database_url) - ::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected? - if ::ActiveRecord::Base.connection.adapter_name == "Mysql2" - ::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;") - end - end - - def retrieve_batch(fetch_specification, batch_size) - Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a - end - - def get_remaining_count(fetch_specification) - Record.remaining_for(fetch_specification).count - end - - def obtain_lock_for_process(fetch_specification, process_uuid, clock:) - Lock.obtain(fetch_specification, process_uuid, clock: clock) - end - - def release_lock_for_process(fetch_specification, process_uuid) - Lock.release(fetch_specification, process_uuid) - end - - def mark_as_enqueued(record, now) - record.update_column(:enqueued_at, now) - end - - def delete_enqueued_older_than(fetch_specification, duration, limit) - scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago) - scope = scope.limit(limit).order(:id) unless limit == :all - scope.delete_all - :ok - rescue ::ActiveRecord::Deadlocked - :deadlocked - rescue ::ActiveRecord::LockWaitTimeout - :lock_timeout - end - end - end -end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb index d3212f4e24..da84660974 100644 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb @@ -2,11 +2,15 @@ require "sidekiq" require_relative "sidekiq5_format" -require_relative "repository" +require_relative "repositories/mysql57" module RubyEventStore module Outbox class SidekiqProducer + def initialize(repository) + @repository = repository + end + def call(klass, args) sidekiq_client = Sidekiq::Client.new(Sidekiq.redis_pool) item = { "args" => args.map(&:to_h).map { |h| h.transform_keys(&:to_s) }, "class" => klass } @@ -18,11 +22,7 @@ def call(klass, args) normalized_item end if payload - Repository::Record.create!( - format: SIDEKIQ5_FORMAT, - split_key: payload.fetch("queue"), - payload: payload.to_json - ) + repository.insert_record(SIDEKIQ5_FORMAT, payload.fetch("queue"), payload.to_json) end end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb index 18f26a6d65..9920b36c8b 100644 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb @@ -5,9 +5,9 @@ module RubyEventStore module Outbox class SidekiqScheduler - def initialize(serializer: RubyEventStore::Serializers::YAML) + def initialize(repository: Repositories::Mysql57.build_for_producer, serializer: RubyEventStore::Serializers::YAML) @serializer = serializer - @sidekiq_producer = SidekiqProducer.new + @sidekiq_producer = SidekiqProducer.new(repository) end def call(klass, record) diff --git a/contrib/ruby_event_store-outbox/spec/consumer_spec.rb b/contrib/ruby_event_store-outbox/spec/consumer_spec.rb index 7b353d78a7..53df781c08 100644 --- a/contrib/ruby_event_store-outbox/spec/consumer_spec.rb +++ b/contrib/ruby_event_store-outbox/spec/consumer_spec.rb @@ -36,8 +36,8 @@ module Outbox end specify "push the jobs to sidekiq" do - record = create_record("default", "default") clock = TickingClock.new + record = create_record("default", "default", clock: clock) consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics) result = consumer.process @@ -53,10 +53,10 @@ module Outbox end specify "push multiple jobs to different queues" do - create_record("default", "default") - create_record("default", "default") - create_record("default2", "default2") clock = TickingClock.new + create_record("default", "default", clock: clock) + create_record("default", "default", clock: clock) + create_record("default2", "default2", clock: clock) consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics) @@ -87,10 +87,10 @@ module Outbox end specify "returns false if didnt aquire lock" do - create_record("default", "default") - consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) clock = TickingClock.new - Repository::Lock.obtain( + create_record("default", "default", clock: clock) + consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) + Repositories::Mysql57::Lock.obtain( FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "some-other-process-uuid", clock: clock @@ -152,13 +152,7 @@ module Outbox } ] } - Repository::Record.create!( - split_key: "default", - created_at: Time.now.utc, - format: "sidekiq5", - enqueued_at: nil, - payload: payload.to_json - ) + Repositories::Mysql57.new.insert_record("sidekiq5", "default", payload.to_json) consumer = Consumer.new( SecureRandom.uuid, @@ -174,10 +168,10 @@ module Outbox end specify "incorrect payload wont cause later messages to schedule" do - record1 = create_record("default", "default") - record1.update!(payload: "unparsable garbage") - record2 = create_record("default", "default") clock = TickingClock.new + record1 = create_record("default", "default", clock: clock) + record1.update!(payload: "unparsable garbage") + record2 = create_record("default", "default", clock: clock) consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics) @@ -191,7 +185,7 @@ module Outbox end specify "deadlock when obtaining lock just skip that attempt" do - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) + expect(Repositories::Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) clock = TickingClock.new consumer = Consumer.new( @@ -211,7 +205,7 @@ module Outbox end specify "lock timeout when obtaining lock just skip that attempt" do - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) + expect(Repositories::Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) clock = TickingClock.new consumer = Consumer.new( @@ -232,7 +226,7 @@ module Outbox specify "obtaining taken lock just skip that attempt" do clock = TickingClock.new - Repository::Lock.obtain(FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "other-process-uuid", clock: clock) + Repositories::Mysql57::Lock.obtain(FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "other-process-uuid", clock: clock) consumer = Consumer.new( SecureRandom.uuid, @@ -251,15 +245,15 @@ module Outbox end specify "deadlock when releasing lock doesnt do anything" do - create_record("default", "default") - allow(Repository::Lock).to receive(:lock).and_wrap_original do |m, *args| + clock = TickingClock.new + create_record("default", "default", clock: clock) + allow(Repositories::Mysql57::Lock).to receive(:lock).and_wrap_original do |m, *args| if caller.any? { |l| l.include? "`release'" } raise ::ActiveRecord::Deadlocked else m.call(*args) end end - clock = TickingClock.new consumer = Consumer.new( SecureRandom.uuid, @@ -278,8 +272,7 @@ module Outbox end specify "lock timeout when releasing lock doesnt do anything" do - create_record("default", "default") - allow(Repository::Lock).to receive(:lock).and_wrap_original do |m, *args| + allow(Repositories::Mysql57::Lock).to receive(:lock).and_wrap_original do |m, *args| if caller.any? { |l| l.include? "`release'" } raise ::ActiveRecord::LockWaitTimeout else @@ -287,6 +280,7 @@ module Outbox end end clock = TickingClock.new + create_record("default", "default", clock: clock) consumer = Consumer.new( SecureRandom.uuid, @@ -305,25 +299,25 @@ module Outbox end specify "after successful loop, lock is released" do - create_record("default", "default") clock = TickingClock.new + create_record("default", "default", clock: clock) consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics) consumer.process - lock = Repository::Lock.find_by!(split_key: "default") + lock = Repositories::Mysql57::Lock.find_by!(split_key: "default") expect(lock.locked_by).to be_nil expect(lock.locked_at).to be_nil end specify "lock disappearing in the meantime, doesnt do anything" do - create_record("default", "default") clock = TickingClock.new + create_record("default", "default", clock: clock) consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: test_metrics) allow(consumer).to receive(:release_lock_for_process).and_wrap_original do |m, *args| - Repository::Lock.delete_all + Repositories::Mysql57::Lock.delete_all m.call(*args) end @@ -338,12 +332,12 @@ module Outbox end specify "lock stolen in the meantime, doesnt do anything" do - create_record("default", "default") clock = TickingClock.new + create_record("default", "default", clock: clock) consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics) allow(consumer).to receive(:release_lock_for_process).and_wrap_original do |m, *args| - Repository::Lock.update_all(locked_by: SecureRandom.uuid) + Repositories::Mysql57::Lock.update_all(locked_by: SecureRandom.uuid) m.call(*args) end @@ -355,12 +349,13 @@ module Outbox end specify "old lock can be reobtained" do - Repository::Lock.obtain( + clock = TickingClock.new(start: 10.minutes.ago) + Repositories::Mysql57::Lock.obtain( FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "some-old-uuid", - clock: TickingClock.new(start: 10.minutes.ago) + clock: clock ) - record = create_record("default", "default") + record = create_record("default", "default", clock: clock) consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) result = consumer.process @@ -371,10 +366,11 @@ module Outbox end specify "relatively fresh locks are not reobtained" do - Repository::Lock.obtain( + clock = TickingClock.new(start: 9.minutes.ago) + Repositories::Mysql57::Lock.obtain( FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "some-old-uuid", - clock: TickingClock.new(start: 9.minutes.ago) + clock: clock ) create_record("default", "default") consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) @@ -385,11 +381,11 @@ module Outbox end specify "when inserting lock, other process may do same concurrently" do - record = create_record("default", "default") clock = TickingClock.new + record = create_record("default", "default", clock: clock) consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics) - allow(Repository::Lock).to receive(:create!).and_wrap_original do |m, *args| + allow(Repositories::Mysql57::Lock).to receive(:create!).and_wrap_original do |m, *args| m.call(*args) # To simulate someone inserting a record just before us m.call(*args) end @@ -445,8 +441,8 @@ module Outbox specify "death of a consumer shouldnt prevent other processes from processing" do consumer_1 = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) - expect(Repository::Record).to receive(:where).and_raise("Unexpected error, such as OOM").ordered - expect(Repository::Record).to receive(:where).and_call_original.ordered.at_least(2).times + expect(Repositories::Mysql57::Record).to receive(:where).and_raise("Unexpected error, such as OOM").ordered + expect(Repositories::Mysql57::Record).to receive(:where).and_call_original.ordered.at_least(2).times expect { consumer_1.process }.to raise_error(/Unexpected error/) create_record("default", "default") @@ -457,21 +453,21 @@ module Outbox # We don't expect both records to be processed (because one of the Locks may be obtained by crashed process, but we expect to do SOME work in ANY splits. expect(result).to eq(true) - expect(Repository::Record.where("enqueued_at is not null").count).to be_positive + expect(Repositories::Mysql57::Record.where("enqueued_at is not null").count).to be_positive end specify "lock is refreshed after each batch" do skip "https://github.com/rspec/rspec-mocks/issues/1306" if RUBY_VERSION >= "3.0" consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) 2.times.map { |r| create_record("default", "default") } - expect_any_instance_of(Repository::Lock).to receive(:refresh).twice.and_call_original + expect_any_instance_of(Repositories::Mysql57).to receive(:refresh_lock).twice.and_call_original consumer.process end specify "clean old jobs" do - create_record("default", "default") clock = TickingClock.new + create_record("default", "default", clock: clock) consumer = Consumer.new( SecureRandom.uuid, @@ -482,17 +478,17 @@ module Outbox ) consumer.process expect(redis.llen("queue:default")).to eq(1) - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) travel (7.days + 1.minute) consumer.process - expect(Repository::Record.count).to eq(0) + expect(Repositories::Mysql57::Record.count).to eq(0) end specify "clean old jobs with limit" do - 3.times.map { create_record("default", "default") } clock = TickingClock.new + 3.times.map { create_record("default", "default", clock: clock) } consumer = Consumer.new( SecureRandom.uuid, @@ -503,17 +499,17 @@ module Outbox ) consumer.process expect(redis.llen("queue:default")).to eq(3) - expect(Repository::Record.count).to eq(3) + expect(Repositories::Mysql57::Record.count).to eq(3) travel (7.days + 1.minute) consumer.process - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) end specify "clean old jobs - lock timeout" do - create_record("default", "default") clock = TickingClock.new + create_record("default", "default", clock: clock) consumer = Consumer.new( SecureRandom.uuid, @@ -524,20 +520,20 @@ module Outbox ) consumer.process expect(redis.llen("queue:default")).to eq(1) - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) travel (7.days + 1.minute) allow_any_instance_of(::ActiveRecord::Relation).to receive(:delete_all).and_raise(::ActiveRecord::LockWaitTimeout) consumer.process - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) expect(logger_output.string).to include("Cleanup for split_key 'default' failed (lock timeout)") expect(test_metrics.operation_results).to include({ operation: "cleanup", result: "lock_timeout" }) end specify "clean old jobs - deadlock" do - create_record("default", "default") clock = TickingClock.new + create_record("default", "default", clock: clock) consumer = Consumer.new( SecureRandom.uuid, @@ -548,18 +544,18 @@ module Outbox ) consumer.process expect(redis.llen("queue:default")).to eq(1) - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) travel (7.days + 1.minute) allow_any_instance_of(::ActiveRecord::Relation).to receive(:delete_all).and_raise(::ActiveRecord::Deadlocked) consumer.process - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) expect(logger_output.string).to include("Cleanup for split_key 'default' failed (deadlock)") expect(test_metrics.operation_results).to include({ operation: "cleanup", result: "deadlocked" }) end - def create_record(queue, split_key, format: "sidekiq5") + def create_record(queue, split_key, format: "sidekiq5", clock: Time) payload = { class: "SomeAsyncHandler", queue: queue, @@ -575,13 +571,7 @@ def create_record(queue, split_key, format: "sidekiq5") } ] } - Repository::Record.create!( - split_key: split_key, - created_at: Time.now.utc, - format: format, - enqueued_at: nil, - payload: payload.to_json - ) + Repositories::Mysql57.build_for_producer(clock: clock).insert_record(format, split_key, payload.to_json) end end end diff --git a/contrib/ruby_event_store-outbox/spec/repositories/mysql57_spec.rb b/contrib/ruby_event_store-outbox/spec/repositories/mysql57_spec.rb new file mode 100644 index 0000000000..f7fe4b413d --- /dev/null +++ b/contrib/ruby_event_store-outbox/spec/repositories/mysql57_spec.rb @@ -0,0 +1,206 @@ +require "spec_helper" + +module RubyEventStore + module Outbox + module Repositories + ::RSpec.describe Mysql57, db: true do + include SchemaHelper + + let(:database_url) { ENV["DATABASE_URL"] } + let(:message_format) { "some_message_format" } + let(:split_key) { "some_split_key" } + let(:some_process_uuid) { SecureRandom.uuid } + let(:other_process_uuid) { SecureRandom.uuid } + let(:clock) { TickingClock.new } + + specify "successful obtaining returns Lock structure" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(lock).to be_a(Mysql57::Lock) + expect(lock.fetch_specification).to eq(expected_fetch_specification) + expect(lock).to be_locked_by(some_process_uuid) + expect(lock).to be_recently_locked(clock: clock) + end + + specify "Lock is not considered locked after some time" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + + result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + wait_for_lock_duration + + expect(result).not_to be_recently_locked(clock: clock) + end + + specify "trying to obtain taken Lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + + result = repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid) + + expect(result).to be(:taken) + end + + specify "obtains a lock for given fetch specification" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + repository.obtain_lock_for_process( + FetchSpecification.new("other_message_format", split_key), + some_process_uuid + ) + repository.obtain_lock_for_process( + FetchSpecification.new(message_format, "other_split_key"), + some_process_uuid + ) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(lock.fetch_specification).to eq(expected_fetch_specification) + end + + specify "successful release" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + + result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:ok) + lock.reload + expect(lock.locked_by).to be_nil + expect(lock.locked_at).to be_nil + end + + specify "released lock can be obtained by other process" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + + repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + result = repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid) + expect(result).to be_a(Mysql57::Lock) + end + + specify "cant release not obtained lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + + result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:not_taken_by_this_process) + end + + specify "one process cant release other's lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + + result = repository.release_lock_for_process(expected_fetch_specification, other_process_uuid) + + expect(result).to be(:not_taken_by_this_process) + end + + specify "lock timeout when obtaining Lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + expect(Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) + + result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:lock_timeout) + end + + specify "deadlock when obtaining Lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + expect(Mysql57::Lock).to receive(:create!).and_raise(::ActiveRecord::Deadlocked) + + result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:deadlocked) + end + + specify "lock timeout when releasing lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + expect(Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) + + result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:lock_timeout) + end + + specify "deadlock when releasing lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + expect(Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) + + result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:deadlocked) + end + + specify "refreshing lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + clock.test_travel (Mysql57::RECENTLY_LOCKED_DURATION / 2) + + result = repository.refresh_lock(lock) + + clock.test_travel (Mysql57::RECENTLY_LOCKED_DURATION / 2 + 1.second) + expect(result).to be(:ok) + expect(lock).to be_locked_by(some_process_uuid) + expect(lock).to be_recently_locked(clock: clock) + end + + specify "refreshing lock when other process stole it" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock_for_some_process = + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + wait_for_lock_duration + lock_for_other_process = + repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid) + + result = repository.refresh_lock(lock_for_some_process) + + expect(result).to be(:stolen) + end + + specify "lock timeout when refreshing lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + expect(Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) + + result = repository.refresh_lock(lock) + + expect(result).to be(:lock_timeout) + end + + specify "deadlocked when refreshing lock" do + repository = Mysql57.build_for_consumer(database_url, clock: clock) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid) + expect(Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) + + result = repository.refresh_lock(lock) + + expect(result).to be(:deadlocked) + end + + def wait_for_lock_duration + clock.test_travel (Mysql57::RECENTLY_LOCKED_DURATION + 1.second) + end + end + end + end +end diff --git a/contrib/ruby_event_store-outbox/spec/repository_spec.rb b/contrib/ruby_event_store-outbox/spec/repository_spec.rb deleted file mode 100644 index 2a919bf2a5..0000000000 --- a/contrib/ruby_event_store-outbox/spec/repository_spec.rb +++ /dev/null @@ -1,206 +0,0 @@ -require "spec_helper" - -module RubyEventStore - module Outbox - ::RSpec.describe Repository, db: true do - include SchemaHelper - - let(:database_url) { ENV["DATABASE_URL"] } - let(:message_format) { "some_message_format" } - let(:split_key) { "some_split_key" } - let(:some_process_uuid) { SecureRandom.uuid } - let(:other_process_uuid) { SecureRandom.uuid } - let(:clock) { TickingClock.new } - - specify "successful obtaining returns Lock structure" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - expect(lock).to be_a(Repository::Lock) - expect(lock.fetch_specification).to eq(expected_fetch_specification) - expect(lock).to be_locked_by(some_process_uuid) - expect(lock).to be_recently_locked(clock: clock) - end - - specify "Lock is not considered locked after some time" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - - result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - wait_for_lock_duration - - expect(result).not_to be_recently_locked(clock: clock) - end - - specify "trying to obtain taken Lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - result = repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) - - expect(result).to be(:taken) - end - - specify "obtains a lock for given fetch specification" do - repository = Repository.new(database_url) - repository.obtain_lock_for_process( - FetchSpecification.new("other_message_format", split_key), - some_process_uuid, - clock: clock - ) - repository.obtain_lock_for_process( - FetchSpecification.new(message_format, "other_split_key"), - some_process_uuid, - clock: clock - ) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - expect(lock.fetch_specification).to eq(expected_fetch_specification) - end - - specify "successful release" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - expect(result).to be(:ok) - lock.reload - expect(lock.locked_by).to be_nil - expect(lock.locked_at).to be_nil - end - - specify "released lock can be obtained by other process" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - result = repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) - expect(result).to be_a(Repository::Lock) - end - - specify "cant release not obtained lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - - result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - expect(result).to be(:not_taken_by_this_process) - end - - specify "one process cant release other's lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - result = repository.release_lock_for_process(expected_fetch_specification, other_process_uuid) - - expect(result).to be(:not_taken_by_this_process) - end - - specify "lock timeout when obtaining Lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) - - result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - expect(result).to be(:lock_timeout) - end - - specify "deadlock when obtaining Lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - expect(Repository::Lock).to receive(:create!).and_raise(::ActiveRecord::Deadlocked) - - result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - expect(result).to be(:deadlocked) - end - - specify "lock timeout when releasing lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) - - result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - expect(result).to be(:lock_timeout) - end - - specify "deadlock when releasing lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) - - result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - expect(result).to be(:deadlocked) - end - - specify "refreshing lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - clock.test_travel (Repository::RECENTLY_LOCKED_DURATION / 2) - - result = lock.refresh(clock: clock) - - clock.test_travel (Repository::RECENTLY_LOCKED_DURATION / 2 + 1.second) - expect(result).to be(:ok) - expect(lock).to be_locked_by(some_process_uuid) - expect(lock).to be_recently_locked(clock: clock) - end - - specify "refreshing lock when other process stole it" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock_for_some_process = - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - wait_for_lock_duration - lock_for_other_process = - repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) - - result = lock_for_some_process.refresh(clock: clock) - - expect(result).to be(:stolen) - end - - specify "lock timeout when refreshing lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - expect(lock).to receive(:lock!).and_raise(::ActiveRecord::LockWaitTimeout) - - result = lock.refresh(clock: clock) - - expect(result).to be(:lock_timeout) - end - - specify "deadlocked when refreshing lock" do - repository = Repository.new(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - expect(lock).to receive(:lock!).and_raise(::ActiveRecord::Deadlocked) - - result = lock.refresh(clock: clock) - - expect(result).to be(:deadlocked) - end - - def wait_for_lock_duration - clock.test_travel (Repository::RECENTLY_LOCKED_DURATION + 1.second) - end - end - end -end diff --git a/contrib/ruby_event_store-outbox/spec/sidekiq_scheduler_spec.rb b/contrib/ruby_event_store-outbox/spec/sidekiq_scheduler_spec.rb index 60688c8ab3..02119bd79c 100644 --- a/contrib/ruby_event_store-outbox/spec/sidekiq_scheduler_spec.rb +++ b/contrib/ruby_event_store-outbox/spec/sidekiq_scheduler_spec.rb @@ -64,8 +64,8 @@ def through_outbox? subject.call(CorrectAsyncHandler, event_record) - expect(Repository::Record.count).to eq(1) - record = Repository::Record.first + expect(Repositories::Mysql57::Record.count).to eq(1) + record = Repositories::Mysql57::Record.first expect(record.created_at).to be_present expect(record.enqueued_at).to be_nil expect(record.split_key).to eq("default") @@ -108,7 +108,7 @@ def through_outbox? subject.call(CorrectAsyncHandler, event_record) - record = Repository::Record.first + record = Repositories::Mysql57::Record.first expect(record.split_key).to eq("custom_queue") expect(record.hash_payload[:queue]).to eq("custom_queue") end @@ -130,7 +130,7 @@ def through_outbox? subject.call(CorrectAsyncHandlerWithRetryQueue, event_record) - record = Repository::Record.first + record = Repositories::Mysql57::Record.first expect(record.split_key).to eq("custom_queue") expect(record.hash_payload[:retry_queue]).to eq("custom_queue_retries") end @@ -155,7 +155,7 @@ def through_outbox? subject.call(CorrectAsyncHandler, event_record) - expect(Repository::Record.count).to eq(0) + expect(Repositories::Mysql57::Record.count).to eq(0) end end end