diff --git a/CHANGELOG.md b/CHANGELOG.md index 35c8582..9743dd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - yyyy-mm-dd +### Added + +### Changed + +### Fixed + +## [6.6.0] - 2024-06-17 + +### Added + +- add option `strict_order` + +### Fixed +- fix README + ## [6.5.0] - 2024-06-05 ### Added diff --git a/README.md b/README.md index 1603266..66e76fe 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,7 @@ Rails.application.config.outbox.tap do |config| config.paths << Rails.root.join("config/outbox.yml").to_s # optional; configuration file paths, deep merged at the application start, useful with Rails engines # optional (worker v2: default) - c.poller = ActiveSupport::OrderedOptions.new.tap do |pc| + config.poller = ActiveSupport::OrderedOptions.new.tap do |pc| # max parallel threads (per box-item, globally) pc.concurrency = 6 # max threads count (per worker process) @@ -152,7 +152,7 @@ Rails.application.config.outbox.tap do |config| end # optional (worker v2: default) - c.processor = ActiveSupport::OrderedOptions.new.tap do |pc| + config.processor = ActiveSupport::OrderedOptions.new.tap do |pc| # max threads count (per worker process) pc.threads_count = 4 # maximum processing time of the batch, after which the batch will be considered hung and processing will be aborted @@ -236,6 +236,7 @@ default: &default owner: my_outbox_item_team # optional, used in Yabeda metrics retention: P1W # retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations max_retries: 3 # default 0, the number of retries before the item will be marked as failed + strict_order: false # optional, default transports: # transports section produce_message: # underscored transport class name # transport reserved options @@ -255,6 +256,10 @@ production: <<: *default bucket_size: 256 ``` +__CAUTION__: +- ⚠️ If this option is enabled and an error occurs while processing a message in a bucket, +subsequent messages in that bucket won't be processed until the current message is either skipped or successfully processed +- ⚠️ Cannot use `retry_strategies` and the `strict_order` option at the same time ```ruby # app/services/import_order.rb diff --git a/app/models/sbmt/outbox/base_item.rb b/app/models/sbmt/outbox/base_item.rb index 6a9ae63..a4084cb 100644 --- a/app/models/sbmt/outbox/base_item.rb +++ b/app/models/sbmt/outbox/base_item.rb @@ -6,7 +6,7 @@ class BaseItem < Outbox.active_record_base_class self.abstract_class = true class << self - delegate :owner, to: :config + delegate :owner, :strict_order, to: :config def box_type raise NotImplementedError @@ -134,6 +134,7 @@ def retriable? end def max_retries_exceeded? + return false if config.strict_order return true unless retriable? errors_count > config.max_retries diff --git a/app/models/sbmt/outbox/base_item_config.rb b/app/models/sbmt/outbox/base_item_config.rb index 0db85c2..9ee5e68 100644 --- a/app/models/sbmt/outbox/base_item_config.rb +++ b/app/models/sbmt/outbox/base_item_config.rb @@ -16,7 +16,7 @@ def initialize(box_id:, box_name:) end def owner - return @owner if defined? @owner + return @owner if defined?(@owner) @owner = options[:owner].presence || yaml_config[:owner].presence end @@ -57,7 +57,14 @@ def retry_strategies return @retry_strategies if defined?(@retry_strategies) configured_strategies = options[:retry_strategies] - strategies = configured_strategies.presence || %w[exponential_backoff latest_available] + + raise ConfigError, "You cannot use retry_strategies and the strict_order option at the same time." if strict_order.present? && configured_strategies.present? + + strategies = if strict_order.present? && configured_strategies.nil? + [] + else + configured_strategies.presence || %w[exponential_backoff latest_available] + end @retry_strategies ||= Array.wrap(strategies).map do |str_name| "Sbmt::Outbox::RetryStrategies::#{str_name.camelize}".constantize @@ -108,6 +115,12 @@ def transports end end + def strict_order + return @strict_order if defined?(@strict_order) + + @strict_order = options[:strict_order].presence + end + private attr_accessor :box_id, :box_name diff --git a/lib/sbmt/outbox/v2/processor.rb b/lib/sbmt/outbox/v2/processor.rb index 8114092..d485f61 100644 --- a/lib/sbmt/outbox/v2/processor.rb +++ b/lib/sbmt/outbox/v2/processor.rb @@ -68,15 +68,18 @@ def lock_task(scheduled_task) def process(task) lock_timer = Cutoff.new(lock_timeout) last_id = 0 + strict_order = task.item_class.config.strict_order box_worker.item_execution_runtime.measure(task.yabeda_labels) do Outbox.database_switcher.use_master do task.ids.each do |id| - ProcessItem.call(task.item_class, id, worker_version: task.yabeda_labels[:worker_version]) + result = ProcessItem.call(task.item_class, id, worker_version: task.yabeda_labels[:worker_version]) box_worker.job_items_counter.increment(task.yabeda_labels) last_id = id lock_timer.checkpoint! + + break if strict_order == true && result.failure? end end end diff --git a/lib/sbmt/outbox/version.rb b/lib/sbmt/outbox/version.rb index 278e708..b3fe36d 100644 --- a/lib/sbmt/outbox/version.rb +++ b/lib/sbmt/outbox/version.rb @@ -2,6 +2,6 @@ module Sbmt module Outbox - VERSION = "6.5.0" + VERSION = "6.6.0" end end diff --git a/spec/lib/sbmt/outbox/v2/processor_spec.rb b/spec/lib/sbmt/outbox/v2/processor_spec.rb index 14005e7..d7dee4b 100644 --- a/spec/lib/sbmt/outbox/v2/processor_spec.rb +++ b/spec/lib/sbmt/outbox/v2/processor_spec.rb @@ -77,6 +77,56 @@ expect(processor.start).to be(Sbmt::Outbox::V2::ThreadPool::SKIPPED) end + + context "when use option strict_order" do + context "when strict_order is true" do + before do + allow(task.item_class).to receive(:config).and_return(OpenStruct.new(strict_order: true)) + end + + it "stops processing on failure" do + expect(processor.send(:lock_manager)).to receive(:lock) + .with("sbmt:outbox:processor:inbox_item:0:lock", 1000) + .and_yield(task) + + allow(Sbmt::Outbox::ProcessItem).to receive(:call).with(any_args).and_return(OpenStruct.new(failure?: true)) + + expect { processor.start }.to not_change(InboxItem.delivered, :count) + .and measure_yabeda_histogram(Yabeda.box_worker.item_execution_runtime).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2) + .and increment_yabeda_counter(Yabeda.box_worker.job_items_counter).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2).by(1) + .and not_increment_yabeda_counter(Yabeda.box_worker.job_timeout_counter) + .and not_increment_yabeda_counter(Yabeda.outbox.sent_counter) + .and not_update_yabeda_gauge(Yabeda.outbox.last_sent_event_id) + .and not_measure_yabeda_histogram(Yabeda.outbox.process_latency) + .and not_increment_yabeda_counter(Yabeda.outbox.error_counter) + .and not_increment_yabeda_counter(Yabeda.outbox.retry_counter) + .and not_increment_yabeda_counter(Yabeda.outbox.discarded_counter) + .and not_increment_yabeda_counter(Yabeda.outbox.fetch_error_counter) + end + end + + context "when strict_order is false" do + it "continues processing on failure" do + expect(processor.send(:lock_manager)).to receive(:lock) + .with("sbmt:outbox:processor:inbox_item:0:lock", 1000) + .and_yield(task) + + allow(Sbmt::Outbox::ProcessItem).to receive(:call).with(any_args).and_return(OpenStruct.new(failure?: true)) + + expect { processor.start }.to not_change(InboxItem.delivered, :count) + .and measure_yabeda_histogram(Yabeda.box_worker.item_execution_runtime).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2) + .and increment_yabeda_counter(Yabeda.box_worker.job_items_counter).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2).by(2) + .and not_increment_yabeda_counter(Yabeda.box_worker.job_timeout_counter) + .and not_increment_yabeda_counter(Yabeda.outbox.sent_counter) + .and not_update_yabeda_gauge(Yabeda.outbox.last_sent_event_id) + .and not_measure_yabeda_histogram(Yabeda.outbox.process_latency) + .and not_increment_yabeda_counter(Yabeda.outbox.error_counter) + .and not_increment_yabeda_counter(Yabeda.outbox.retry_counter) + .and not_increment_yabeda_counter(Yabeda.outbox.discarded_counter) + .and not_increment_yabeda_counter(Yabeda.outbox.fetch_error_counter) + end + end + end end context "when redis job queue is empty" do diff --git a/spec/models/sbmt/outbox/base_item_spec.rb b/spec/models/sbmt/outbox/base_item_spec.rb index 3537c1e..76e81cc 100644 --- a/spec/models/sbmt/outbox/base_item_spec.rb +++ b/spec/models/sbmt/outbox/base_item_spec.rb @@ -19,16 +19,51 @@ expect(outbox_item).to be_max_retries_exceeded end end + + context "when strict order is enabled" do + before do + allow(outbox_item.config).to receive(:strict_order).and_return(true) + end + + it "does not consider retries when strict order is enabled" do + expect(outbox_item).not_to be_max_retries_exceeded + end + end end describe "#retry_strategies" do let(:outbox_item) { create(:combined_outbox_item) } + let(:config) { Sbmt::Outbox::OutboxItemConfig.new(box_id: Combined::OutboxItem.box_id, box_name: Combined::OutboxItem.box_name) } + + before { allow(outbox_item).to receive(:config).and_return(config) } + + context "when retry_strategies are not configured" do + it "uses default retry strategies" do + expect(outbox_item.config.retry_strategies).to eq([ + Sbmt::Outbox::RetryStrategies::ExponentialBackoff, + Sbmt::Outbox::RetryStrategies::LatestAvailable + ]) + end + end - it "has proper defaults" do - expect(outbox_item.config.retry_strategies).to eq([ - Sbmt::Outbox::RetryStrategies::ExponentialBackoff, - Sbmt::Outbox::RetryStrategies::LatestAvailable - ]) + context "when both retry_strategies and strict_order are present" do + before do + allow(outbox_item.config).to receive_messages(strict_order: true, options: {retry_strategies: ["exponential_backoff"]}) + end + + it "raises a ConfigError" do + expect { outbox_item.config.retry_strategies }.to raise_error(Sbmt::Outbox::ConfigError, "You cannot use retry_strategies and the strict_order option at the same time.") + end + end + + context "when only strict_order is configured without retry_strategies" do + before do + allow(outbox_item.config).to receive_messages(strict_order: true, options: {}) + end + + it "retry strategies are not used" do + expect(outbox_item.config.retry_strategies).to eq([]) + end end end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index ec7de59..049f236 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -47,6 +47,8 @@ RSpec::Matchers.define_negated_matcher :not_increment_yabeda_counter, :increment_yabeda_counter RSpec::Matchers.define_negated_matcher :not_update_yabeda_gauge, :update_yabeda_gauge +RSpec::Matchers.define_negated_matcher :not_measure_yabeda_histogram, :measure_yabeda_histogram +RSpec::Matchers.define_negated_matcher :not_change, :change require "sbmt/outbox/instrumentation/open_telemetry_loader"