Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable automatic message visibility refresh #87

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
23 changes: 22 additions & 1 deletion lib/aws/rails/sqs_active_job/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ class Executor
fallback_policy: :caller_runs # slow down the producer thread
}.freeze

def initialize(options = {})
def initialize(options = {}, refresh_timeout = nil)
mullermp marked this conversation as resolved.
Show resolved Hide resolved
@executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options))
# Monitor threads used to refresh visiblity
@refresh_timeout = refresh_timeout
@monitor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options)) if refresh_timeout
@logger = options[:logger] || ActiveSupport::Logger.new(STDOUT)
end

# TODO: Consider catching the exception and sleeping instead of using :caller_runs
def execute(message)
# Used to tell the visibilty refresh thread to give up
poison_pill = false
mullermp marked this conversation as resolved.
Show resolved Hide resolved
@executor.post(message) do |message|
begin
job = JobRunner.new(message)
Expand All @@ -36,8 +41,24 @@ def execute(message)
job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job'
@logger.info "Error processing job #{job_msg}: #{e}"
@logger.debug e.backtrace.join("\n")
ensure
poison_pill = true
end
end

@monitor.post(message) do |message|
while poison_pill
begin
# Extend the visibility timeout to the provided refresh timeout
message.change_visibility({ visibility_timeout: @refresh_timeout })
# Wait half the refresh timeout, and repeat
sleep(@refresh_timeout / 2)
mullermp marked this conversation as resolved.
Show resolved Hide resolved
rescue => e
# If anything goes wrong, we want to handle it. We don't care what.
mullermp marked this conversation as resolved.
Show resolved Hide resolved
@logger.error("Monitor process failed for message: #{message.id}. Error: #{e}")
end
end
end if @monitor
end

def shutdown(timeout=nil)
Expand Down
7 changes: 6 additions & 1 deletion lib/aws/rails/sqs_active_job/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ def run

Signal.trap('INT') { raise Interrupt }
Signal.trap('TERM') { raise Interrupt }
@executor = Executor.new(max_threads: @options[:threads], logger: @logger, max_queue: @options[:backpressure])
@executor = Executor.new({
max_threads: @options[:threads],
logger: @logger,
max_queue: @options[:backpressure]
}, refresh_timeout = @options[:refresh_timeout])

poll
rescue Interrupt
Expand Down Expand Up @@ -112,6 +116,7 @@ def parse_args(argv)
opts.on("-t", "--threads INTEGER", Integer, "The maximum number of worker threads to create. Defaults to 2x the number of processors available on this system.") { |a| out[:threads] = a }
opts.on("-b", "--backpressure INTEGER", Integer, "The maximum number of messages to have waiting in the Executor queue. This should be a low, but non zero number. Messages in the Executor queue cannot be picked up by other processes and will slow down shutdown.") { |a| out[:backpressure] = a }
opts.on("-m", "--max_messages INTEGER", Integer, "Max number of messages to receive in a batch from SQS.") { |a| out[:max_messages] = a }
opts.on("-r", "--refresh_timeout", Integer, "Enables message visibility refresh, Refresh timeout is in seconds.") { |a| out[:refresh_timeout] = a }
mullermp marked this conversation as resolved.
Show resolved Hide resolved
opts.on("-v", "--visibility_timeout INTEGER", Integer, "The visibility timeout is the number of seconds that a message will not be processable by any other consumers. You should set this value to be longer than your expected job runtime to prevent other processes from picking up an running job. See the SQS Visibility Timeout Documentation at https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html.") { |a| out[:visibility_timeout] = a }
opts.on("-s", "--shutdown_timeout INTEGER", Integer, "The amount of time to wait for a clean shutdown. Jobs that are unable to complete in this time will not be deleted from the SQS queue and will be retryable after the visibility timeout.") { |a| out[:shutdown_timeout] = a }
}
Expand Down