Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2151/add-mode-preserve-ordering' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2151] feat: add option strict_order

Closes DEX-2151

See merge request nstmrt/rubygems/outbox!98
  • Loading branch information
Arlantir committed Jun 20, 2024
2 parents 900618a + a12d4d6 commit 1bf7135
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 12 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased] - yyyy-mm-dd

### Added

### Changed

### Fixed

## [6.6.0] - 2024-06-17

### Added

- add option `strict_order`

### Fixed
- fix README

## [6.5.0] - 2024-06-05

### Added
Expand Down
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Rails.application.config.outbox.tap do |config|
config.paths << Rails.root.join("config/outbox.yml").to_s # optional; configuration file paths, deep merged at the application start, useful with Rails engines

# optional (worker v2: default)
c.poller = ActiveSupport::OrderedOptions.new.tap do |pc|
config.poller = ActiveSupport::OrderedOptions.new.tap do |pc|
# max parallel threads (per box-item, globally)
pc.concurrency = 6
# max threads count (per worker process)
Expand Down Expand Up @@ -152,7 +152,7 @@ Rails.application.config.outbox.tap do |config|
end

# optional (worker v2: default)
c.processor = ActiveSupport::OrderedOptions.new.tap do |pc|
config.processor = ActiveSupport::OrderedOptions.new.tap do |pc|
# max threads count (per worker process)
pc.threads_count = 4
# maximum processing time of the batch, after which the batch will be considered hung and processing will be aborted
Expand Down Expand Up @@ -236,6 +236,7 @@ default: &default
owner: my_outbox_item_team # optional, used in Yabeda metrics
retention: P1W # retention period, https://en.wikipedia.org/wiki/ISO_8601#Durations
max_retries: 3 # default 0, the number of retries before the item will be marked as failed
strict_order: false # optional, default
transports: # transports section
produce_message: # underscored transport class name
# transport reserved options
Expand All @@ -255,6 +256,10 @@ production:
<<: *default
bucket_size: 256
```
__CAUTION__:
- ⚠️ If this option is enabled and an error occurs while processing a message in a bucket,
subsequent messages in that bucket won't be processed until the current message is either skipped or successfully processed
- ⚠️ Cannot use `retry_strategies` and the `strict_order` option at the same time

```ruby
# app/services/import_order.rb
Expand Down
3 changes: 2 additions & 1 deletion app/models/sbmt/outbox/base_item.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class BaseItem < Outbox.active_record_base_class
self.abstract_class = true

class << self
delegate :owner, to: :config
delegate :owner, :strict_order, to: :config

def box_type
raise NotImplementedError
Expand Down Expand Up @@ -134,6 +134,7 @@ def retriable?
end

def max_retries_exceeded?
return false if config.strict_order
return true unless retriable?

errors_count > config.max_retries
Expand Down
17 changes: 15 additions & 2 deletions app/models/sbmt/outbox/base_item_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def initialize(box_id:, box_name:)
end

def owner
return @owner if defined? @owner
return @owner if defined?(@owner)

@owner = options[:owner].presence || yaml_config[:owner].presence
end
Expand Down Expand Up @@ -57,7 +57,14 @@ def retry_strategies
return @retry_strategies if defined?(@retry_strategies)

configured_strategies = options[:retry_strategies]
strategies = configured_strategies.presence || %w[exponential_backoff latest_available]

raise ConfigError, "You cannot use retry_strategies and the strict_order option at the same time." if strict_order.present? && configured_strategies.present?

strategies = if strict_order.present? && configured_strategies.nil?
[]
else
configured_strategies.presence || %w[exponential_backoff latest_available]
end

@retry_strategies ||= Array.wrap(strategies).map do |str_name|
"Sbmt::Outbox::RetryStrategies::#{str_name.camelize}".constantize
Expand Down Expand Up @@ -108,6 +115,12 @@ def transports
end
end

def strict_order
return @strict_order if defined?(@strict_order)

@strict_order = options[:strict_order].presence
end

private

attr_accessor :box_id, :box_name
Expand Down
5 changes: 4 additions & 1 deletion lib/sbmt/outbox/v2/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ def lock_task(scheduled_task)
def process(task)
lock_timer = Cutoff.new(lock_timeout)
last_id = 0
strict_order = task.item_class.config.strict_order

box_worker.item_execution_runtime.measure(task.yabeda_labels) do
Outbox.database_switcher.use_master do
task.ids.each do |id|
ProcessItem.call(task.item_class, id, worker_version: task.yabeda_labels[:worker_version])
result = ProcessItem.call(task.item_class, id, worker_version: task.yabeda_labels[:worker_version])

box_worker.job_items_counter.increment(task.yabeda_labels)
last_id = id
lock_timer.checkpoint!

break if strict_order == true && result.failure?
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/outbox/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module Outbox
VERSION = "6.5.0"
VERSION = "6.6.0"
end
end
50 changes: 50 additions & 0 deletions spec/lib/sbmt/outbox/v2/processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,56 @@

expect(processor.start).to be(Sbmt::Outbox::V2::ThreadPool::SKIPPED)
end

context "when use option strict_order" do
context "when strict_order is true" do
before do
allow(task.item_class).to receive(:config).and_return(OpenStruct.new(strict_order: true))
end

it "stops processing on failure" do
expect(processor.send(:lock_manager)).to receive(:lock)
.with("sbmt:outbox:processor:inbox_item:0:lock", 1000)
.and_yield(task)

allow(Sbmt::Outbox::ProcessItem).to receive(:call).with(any_args).and_return(OpenStruct.new(failure?: true))

expect { processor.start }.to not_change(InboxItem.delivered, :count)
.and measure_yabeda_histogram(Yabeda.box_worker.item_execution_runtime).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2)
.and increment_yabeda_counter(Yabeda.box_worker.job_items_counter).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2).by(1)
.and not_increment_yabeda_counter(Yabeda.box_worker.job_timeout_counter)
.and not_increment_yabeda_counter(Yabeda.outbox.sent_counter)
.and not_update_yabeda_gauge(Yabeda.outbox.last_sent_event_id)
.and not_measure_yabeda_histogram(Yabeda.outbox.process_latency)
.and not_increment_yabeda_counter(Yabeda.outbox.error_counter)
.and not_increment_yabeda_counter(Yabeda.outbox.retry_counter)
.and not_increment_yabeda_counter(Yabeda.outbox.discarded_counter)
.and not_increment_yabeda_counter(Yabeda.outbox.fetch_error_counter)
end
end

context "when strict_order is false" do
it "continues processing on failure" do
expect(processor.send(:lock_manager)).to receive(:lock)
.with("sbmt:outbox:processor:inbox_item:0:lock", 1000)
.and_yield(task)

allow(Sbmt::Outbox::ProcessItem).to receive(:call).with(any_args).and_return(OpenStruct.new(failure?: true))

expect { processor.start }.to not_change(InboxItem.delivered, :count)
.and measure_yabeda_histogram(Yabeda.box_worker.item_execution_runtime).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2)
.and increment_yabeda_counter(Yabeda.box_worker.job_items_counter).with_tags(name: "inbox_item", type: :inbox, worker_name: "processor", worker_version: 2).by(2)
.and not_increment_yabeda_counter(Yabeda.box_worker.job_timeout_counter)
.and not_increment_yabeda_counter(Yabeda.outbox.sent_counter)
.and not_update_yabeda_gauge(Yabeda.outbox.last_sent_event_id)
.and not_measure_yabeda_histogram(Yabeda.outbox.process_latency)
.and not_increment_yabeda_counter(Yabeda.outbox.error_counter)
.and not_increment_yabeda_counter(Yabeda.outbox.retry_counter)
.and not_increment_yabeda_counter(Yabeda.outbox.discarded_counter)
.and not_increment_yabeda_counter(Yabeda.outbox.fetch_error_counter)
end
end
end
end

context "when redis job queue is empty" do
Expand Down
45 changes: 40 additions & 5 deletions spec/models/sbmt/outbox/base_item_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,51 @@
expect(outbox_item).to be_max_retries_exceeded
end
end

context "when strict order is enabled" do
before do
allow(outbox_item.config).to receive(:strict_order).and_return(true)
end

it "does not consider retries when strict order is enabled" do
expect(outbox_item).not_to be_max_retries_exceeded
end
end
end

describe "#retry_strategies" do
let(:outbox_item) { create(:combined_outbox_item) }
let(:config) { Sbmt::Outbox::OutboxItemConfig.new(box_id: Combined::OutboxItem.box_id, box_name: Combined::OutboxItem.box_name) }

before { allow(outbox_item).to receive(:config).and_return(config) }

context "when retry_strategies are not configured" do
it "uses default retry strategies" do
expect(outbox_item.config.retry_strategies).to eq([
Sbmt::Outbox::RetryStrategies::ExponentialBackoff,
Sbmt::Outbox::RetryStrategies::LatestAvailable
])
end
end

it "has proper defaults" do
expect(outbox_item.config.retry_strategies).to eq([
Sbmt::Outbox::RetryStrategies::ExponentialBackoff,
Sbmt::Outbox::RetryStrategies::LatestAvailable
])
context "when both retry_strategies and strict_order are present" do
before do
allow(outbox_item.config).to receive_messages(strict_order: true, options: {retry_strategies: ["exponential_backoff"]})
end

it "raises a ConfigError" do
expect { outbox_item.config.retry_strategies }.to raise_error(Sbmt::Outbox::ConfigError, "You cannot use retry_strategies and the strict_order option at the same time.")
end
end

context "when only strict_order is configured without retry_strategies" do
before do
allow(outbox_item.config).to receive_messages(strict_order: true, options: {})
end

it "retry strategies are not used" do
expect(outbox_item.config.retry_strategies).to eq([])
end
end
end

Expand Down
2 changes: 2 additions & 0 deletions spec/rails_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@

RSpec::Matchers.define_negated_matcher :not_increment_yabeda_counter, :increment_yabeda_counter
RSpec::Matchers.define_negated_matcher :not_update_yabeda_gauge, :update_yabeda_gauge
RSpec::Matchers.define_negated_matcher :not_measure_yabeda_histogram, :measure_yabeda_histogram
RSpec::Matchers.define_negated_matcher :not_change, :change

require "sbmt/outbox/instrumentation/open_telemetry_loader"

Expand Down

0 comments on commit 1bf7135

Please sign in to comment.