Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a dedicated thread for each connection #70

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions lib/solid_cache/cluster/async_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ def initialize(options)

private
def async(&block)
# Need current shard right now, not when block is called
current_shard = Entry.current_shard
@executor << ->() do
wrap_in_rails_executor do
with_shard(current_shard) do
block.call(current_shard)
end
block.call
end
end
end

def async_on_current_shard(&block)
shard = Entry.current_shard
async do
execute_on_shard(shard) { block.call }
end
end

def wrap_in_rails_executor
if SolidCache.executor
SolidCache.executor.wrap { yield }
Expand Down
79 changes: 48 additions & 31 deletions lib/solid_cache/cluster/connection_handling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ def setup!

if @shards.count > 1
@consistent_hash = MaglevHash.new(@nodes.keys)
# Using a dedicated thread for each shard avoids ActiveRecord connection switching
# Connection switching is expensive because the connection is tested before being
# checked out, which involves a round trip to the database.
@executors = @shards.to_h { |shard| [shard, Concurrent::SingleThreadExecutor.new(max_queue: 10, fallback_policy: :caller_runs)] }
else
@executors = {}
end

@setup = true
Expand All @@ -54,38 +60,41 @@ def nodes
@nodes
end

def writing_all_shards
def writing_all_shards(&block)
return enum_for(:writing_all_shards) unless block_given?

shards.each do |shard|
with_shard(shard, async: async_writes) do
yield
end
end
shards \
.map { |shard| with_shard(shard, async: async_writes, &block) }
.then { |results| realise_all(results) }
end

def writing_across_shards(list:, trim: false)
across_shards(list:, async: async_writes) do |list|
results = across_shards(list:, async: async_writes) do |list|
result = yield list
trim(list.size) if trim
result
end

realise_all(results)
end

def reading_across_shards(list:)
across_shards(list:) { |list| yield list }
across_shards(list:) { |list| yield list }.then { |results| realise_all(results) }
end

def writing_shard(normalized_key:, trim: false)
with_shard(shard_for_normalized_key(normalized_key), async: async_writes) do
result = with_shard(shard_for_normalized_key(normalized_key), async: async_writes) do
result = yield
trim(1) if trim
result
end

realise(result)
end

def reading_shard(normalized_key:)
with_shard(shard_for_normalized_key(normalized_key)) { yield }
result = with_shard(shard_for_normalized_key(normalized_key)) { yield }
realise(result)
end

def active_record_instrumentation?
Expand All @@ -95,13 +104,13 @@ def active_record_instrumentation?
private
attr_reader :consistent_hash

def with_shard(shard, async: false)
if shard
Record.connected_to(shard: shard) do
configure_for_query(async: async) { yield }
end
def with_shard(shard, async: false, &block)
if async
async { execute_on_shard(shard, &block) }
elsif (executor = @executors[shard])
with_executor(executor) { execute_on_shard(shard, &block) }
else
configure_for_query(async: async) { yield }
execute_on_shard(shard, &block)
end
end

Expand All @@ -126,31 +135,39 @@ def shard_for_normalized_key(normalized_key)
nodes[node]
end

def configure_for_query(async:)
async_if_required(async) do
disable_active_record_instrumentation_if_required do
yield
end
def disable_active_record_instrumentation_if_required(&block)
if active_record_instrumentation?
block.call
else
Record.disable_instrumentation(&block)
end
end

def async_if_required(required)
if required
async { yield }
def execute_on_shard(shard, &block)
if shard
Record.connected_to(shard: shard) do
disable_active_record_instrumentation_if_required(&block)
end
else
yield
disable_active_record_instrumentation_if_required(&block)
end
end

def disable_active_record_instrumentation_if_required
if active_record_instrumentation?
yield
def with_executor(executor, &block)
Concurrent::Promise.execute(executor: executor, &block)
end

def realise(result)
if result.is_a?(Concurrent::Promise)
result.value.tap { raise result.reason if result.rejected? }
else
Record.disable_instrumentation do
yield
end
result
end
end

def realise_all(results)
results.map { |result| realise(result) }
end
end
end
end
4 changes: 1 addition & 3 deletions lib/solid_cache/cluster/trimming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ def trim(write_count)
counter.increment(write_count * TRIM_DELETE_MULTIPLIER)
value = counter.value
if value > trim_batch_size && counter.compare_and_set(value, value - trim_batch_size)
async { trim_batch }
async_on_current_shard { trim_batch }
end
end


private

def trim_batch
candidates = Entry.order(:id).limit(trim_batch_size * TRIM_SELECT_MULTIPLIER).select(:id, :created_at).to_a
candidates.select! { |entry| entry.created_at < max_age.seconds.ago } unless cache_full?
Expand Down