Skip to content

Commit

Permalink
Fixes rails#262: Automatic worker process recycling
Browse files Browse the repository at this point in the history
This PR adds two new configuration parameters:
* recycle_on_oom to the Worker (via queue.yml)
* calc_memory_usage as a global parameter (via application.rb,
  environment/*.rb, or an initializer)

There are no specific unit requirements placed on either of these new
parameters. What's important is: They use the same order of magnitude
and they are comparable.

For example, if the calc_memory_usage proc returns 300Mb as 300 (as
in Megabytes) then the recycle_on_oom set on the work should be 300 too.

Any worker without recycle_on_oom is not impacted in anyway.
If the calc_memory_usage is nil (default), then this oom
checking it off for workers under the control of this Supervisor.

The check for OOM is made after the Job has run to completion and
before the SolidQueue worker does any additional processing.

The single biggest change to SolidQueue, that probably requires
the most review is moving the job.unblock_next_blocked_job out of
ClaimedExecution and up one level into Pool.  The rational
for this change is that the ensure block on the Job execution
is not guarrenteed to run if the system / thread is forcibly shutdown
while the job is inflight.  However, the Thread.ensure *does* seem
to get called reliably on forced shutdowns.

Give my almost assuredly incomplete understanding of the concurrency
implementation despite Rosa working very hard to help me to grok it,
there is some risk here that this change is wrong.

My logic for this change is as follows:
* A job that complete successfully would have release its lock -- no
  change
* A job that completes by way of an unhandled exception would have
  released its lock -- no change
* A job that was killed inflight because of a worker recycle_on_oom
  (or an ugly restart out of the users control -- again, looking
  at you Heroku) needs to release its lock -- there is no guarantee
  that its going to be the job that starts on the worker restart.  If
  release its lock in this use-case, then it doesn't, then that worker
  could find itself waiting on the dispatcher (I think) to expire
  Semaphores before it is able to take on new work.

Small fix
  • Loading branch information
hms committed Sep 17, 2024
1 parent 67b964d commit c40c531
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 11 deletions.
110 changes: 107 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ For small projects, you can run Solid Queue on the same machine as your webserve

**Note**: future changes to the schema will come in the form of regular migrations.


### Single database configuration

Running Solid Queue in a separate database is recommended, but it's also possible to use one single database for both the app and the queue. Just follow these steps:
Expand Down Expand Up @@ -99,7 +98,6 @@ By default, Solid Queue will try to find your configuration under `config/queue.
bin/jobs -c config/calendar.yml
```


This is what this configuration looks like:

```yml
Expand Down Expand Up @@ -236,6 +234,7 @@ There are several settings that control how Solid Queue works that you can set a
- `preserve_finished_jobs`: whether to keep finished jobs in the `solid_queue_jobs` table—defaults to `true`.
- `clear_finished_jobs_after`: period to keep finished jobs around, in case `preserve_finished_jobs` is true—defaults to 1 day. **Note:** Right now, there's no automatic cleanup of finished jobs. You'd need to do this by periodically invoking `SolidQueue::Job.clear_finished_in_batches`, but this will happen automatically in the near future.
- `default_concurrency_control_period`: the value to be used as the default for the `duration` parameter in [concurrency controls](#concurrency-controls). It defaults to 3 minutes.
- `calc_memory_usage`: a proc returns the memory consumption of the process(es) that you want to measure. It yields the Worker process PID and runs in the context of the Worker that is configured with `recycle_on_oom`. [Read more](#memory-consumption).

## Errors when enqueuing

Expand Down Expand Up @@ -428,7 +427,112 @@ my_periodic_resque_job:
schedule: "*/5 * * * *"
```

and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.
and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any
`solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once
each time.

## Recycle On OOM

This feature recycles / restarts a worker whenever it exceeds the specified memory threshold. This is particularly
useful for jobs with high memory consumption or when deploying in a memory-constrained environment.

If the result of the `calc_memory_usage` Proc is greater than the `recycle_on_oom` value configured on a specific
worker, that worker will restart. It's important that the units returned by the `calc_memory_usage` Proc match the units
of the `recycle_on_oom` value.
For instance, if the `calc_memory_usage` Proc returns a value MB (i.e., 300 Vs. 300_000_000), the `recycle_on_oom` value
should also be specified in MB.

Using the `get_process_memory` gem, and configuring it return an integer value in MB, you can configure SolidQueue as
follows:

```ruby
# application.rb, production.rb, or
# initializer/solid_queue.rb file
Rails.application.config.solid_queue.calc_memory_usage = ->(pid) { GetProcessMem.new(pid).mb.round(0) }
```

Here is an example of configuring a worker to recycle when memory usage exceeds 200MB:

```yml
worker:
queues: "*"
threads: 3
polling_interval: 2
recycle_on_oom: 200
```

You can also use the `calc_memory_usage` Proc to compute the memory usage across multiple processes:

```ruby
SolidQueue.configure do |config|
config.calc_memory_usage = ->(_) do
SolidQueue::Process.pluck(:pid).sum do |pid|
GetProcessMem.new(pid).mb.round(0)
rescue StandardError
0 # just in case the process for the pid is no longer running
end
end
end
```

Then, set the worker to recycle based on the aggregate maximum memory usage of all processes:

```yml
worker:
queues: "*"
threads: 3
polling_interval: 2
recycle_on_oom: 512
```

Be cautious when using this feature, as it can lead to restarting the worker after each job if not properly configured.
It is advisable to be especially careful using threads with workers configured with `recycle_on_oom`.
For example, two queues — `slow_low_memory` and `fast_high_memory` — could easily result in the slow_low_memory jobs
never completing due to the fast_high_memory jobs triggering the Worker tp recycle without allowing the slow_low_memory
jobs enough time to run to completion.

### A Brief Digression
This is a good time to mention if you choose to use `recycle_on_oom` with threads, then your jobs *really, really should*
be **idempotent** -- a very fancy way of saying that a job could easily be started and stopped multiple times
(see previous paragraph) so it critical than the job be designed in a way to allow for multiple runs before it completes
without doing anything *"unseemly"* (such as email a customer with the same message with each restart).

### Finishing recycle_on_oom
Anytime a Worker is recycled due to memory consumption, it will emit a standard SolidQueue log message labeled: "Worker
OOM". It will report the memory usage that triggered the restart and the vital statistics of the Worker process.
SolidQueue will also output it's standard messaging about the Worker starting and registering.

Other ideas that might help with memory constrained environments include:

```ruby
SolidQueue.on_start do
# If supported by your environment
# This setting will be inherited by all processes started by this Supervisor, including recycled Workers
GC.auto_compact = true
Process.warmup
end
```

and

```ruby
SolidQueue.on_worker_start { Process.warmup }
```

```yml
worker:
queues: "*"
threads: 3
polling_interval: 2
recycle_on_oom: 0
```

will effectively restart at the end of every job.

Finally, triggering a full GC via either after_perform, around_perform, or the end of your Job can't hurt, as it will
run prior to the memory
check by the Worker.

## Inspiration

Expand Down
2 changes: 0 additions & 2 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ def perform
else
failed_with(result.error)
end
ensure
job.unblock_next_blocked_job
end

def release
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module SolidQueue
mattr_accessor :preserve_finished_jobs, default: true
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes
mattr_accessor :calc_memory_usage, default: nil

delegate :on_start, :on_stop, to: Supervisor

Expand Down
14 changes: 14 additions & 0 deletions lib/solid_queue/log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ def enqueue_recurring_task(event)
end
end

def recycle_worker(event)
process = event.payload[:process]

attributes = {
memory_used: event.payload[:memory_used],
pid: process.pid,
hostname: process.hostname,
process_id: process.process_id,
name: process.name
}

warn formatted_event(event, action: "#{process.kind} OOM", **attributes)
end

def start_process(event)
process = event.payload[:process]

Expand Down
16 changes: 12 additions & 4 deletions lib/solid_queue/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@ def initialize(size, on_idle: nil)
@mutex = Mutex.new
end

def post(execution)
def post(execution, worker)
available_threads.decrement

future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
future = Concurrent::Future.new(args: [ execution, worker ], executor: executor) do |thread_execution, worker_execution|
wrap_in_app_executor do
thread_execution.perform
ensure
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
wrap_in_app_executor do
execution.job.unblock_next_blocked_job
end

if worker_execution.oom?
worker_execution.recycle(execution)
else
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
end
end
end

Expand Down
69 changes: 69 additions & 0 deletions lib/solid_queue/processes/recyclable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# frozen_string_literal: true

require "active_support/concern"

module SolidQueue::Processes
module Recyclable
extend ActiveSupport::Concern

included do
attr_reader :max_memory, :calc_memory_usage
end

def recyclable_setup(**options)
return unless configured?(options)

set_max_memory(options[:recycle_on_oom])
set_calc_memory_usage if max_memory
SolidQueue.logger.error { "Recycle on OOM is disabled for worker #{pid}" } unless oom_configured?
end

def recycle(execution = nil)
return false if !oom_configured? || stopped?

memory_used = calc_memory_usage.call(pid)
return false unless memory_exceeded?(memory_used)

SolidQueue.instrument(:recycle_worker, process: self, memory_used: memory_used, class_name: execution&.job&.class_name) do
pool.shutdown
stop
end

true
end

def oom?
oom_configured? && calc_memory_usage.call(pid) > max_memory
end

private

def configured?(options)
options.key?(:recycle_on_oom)
end

def oom_configured?
@oom_configured ||= max_memory.present? && calc_memory_usage.present?
end

def memory_exceeded?(memory_used)
memory_used > max_memory
end

def set_max_memory(max_memory)
if max_memory > 0
@max_memory = max_memory
else
SolidQueue.logger.error { "Invalid value for recycle_on_oom: #{max_memory}." }
end
end

def set_calc_memory_usage
if SolidQueue.calc_memory_usage.respond_to?(:call)
@calc_memory_usage = SolidQueue.calc_memory_usage
else
SolidQueue.logger.error { "SolidQueue.calc_memory_usage provider not configured." }
end
end
end
end
7 changes: 5 additions & 2 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module SolidQueue
class Worker < Processes::Poller
include LifecycleHooks
include Processes::Recyclable

after_boot :run_start_hooks
before_shutdown :run_stop_hooks
Expand All @@ -11,6 +12,7 @@ class Worker < Processes::Poller

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
recyclable_setup(**options)

@queues = Array(options[:queues])
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
Expand All @@ -19,14 +21,15 @@ def initialize(**options)
end

def metadata
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
super.then { _1.merge(queues: queues.join(","), thread_pool_size: pool.size) }
.then { oom_configured? ? _1.merge(recycle_on_oom: max_memory) : _1 }
end

private
def poll
claim_executions.then do |executions|
executions.each do |execution|
pool.post(execution)
pool.post(execution, self)
end

executions.size
Expand Down
7 changes: 7 additions & 0 deletions test/dummy/app/jobs/recycle_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class RecycleJob < ApplicationJob
def perform(nap = nil)
sleep(nap) unless nap.nil?
end
end
9 changes: 9 additions & 0 deletions test/dummy/app/jobs/recycle_with_concurrency_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

class RecycleWithConcurrencyJob < ApplicationJob
limits_concurrency key: ->(nap = nil) { }

def perform(nap = nil)
sleep(nap) unless nap.nil?
end
end
Loading

0 comments on commit c40c531

Please sign in to comment.