-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Extract SolidCache::Cluster, just one cluster for now * Change config to accept clusters/cluster * Write to multiple clusters, read from first * Async writes to other clusters * Update checkout action * Fix merge errors * Update readme with cluster config
- Loading branch information
Showing
26 changed files
with
682 additions
and
415 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
|
||
module SolidCache | ||
class Cluster | ||
require "solid_cache/cluster/hash_ring" | ||
require "solid_cache/cluster/connection_handling" | ||
require "solid_cache/cluster/async_execution" | ||
require "solid_cache/cluster/trimming" | ||
require "solid_cache/cluster/stats" | ||
|
||
include ConnectionHandling, AsyncExecution | ||
include Trimming | ||
include Stats | ||
|
||
def initialize(options = {}) | ||
super(options) | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
module SolidCache | ||
class Cluster | ||
module AsyncExecution | ||
def initialize(options) | ||
super() | ||
@executor = Concurrent::SingleThreadExecutor.new(max_queue: 100, fallback_policy: :discard) | ||
end | ||
|
||
private | ||
def async(&block) | ||
# Need current shard right now, not when block is called | ||
current_shard = Entry.current_shard | ||
@executor << ->() do | ||
wrap_in_rails_executor do | ||
with_shard(current_shard) do | ||
block.call(current_shard) | ||
end | ||
end | ||
end | ||
end | ||
|
||
def wrap_in_rails_executor | ||
if SolidCache.executor | ||
SolidCache.executor.wrap { yield } | ||
else | ||
yield | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
module SolidCache | ||
class Cluster | ||
module ConnectionHandling | ||
attr_reader :async_writes | ||
|
||
def initialize(options = {}) | ||
super(options) | ||
@shards = options.delete(:shards) | ||
@async_writes = options.delete(:async_writes) | ||
end | ||
|
||
def writing_all_shards | ||
return enum_for(:writing_all_shards) unless block_given? | ||
|
||
shards.each do |shard| | ||
with_shard(shard) do | ||
async_if_required { yield } | ||
end | ||
end | ||
end | ||
|
||
def shards | ||
@shards || SolidCache.all_shard_keys || [nil] | ||
end | ||
|
||
def writing_across_shards(list:, trim: false) | ||
across_shards(list:) do |list| | ||
async_if_required do | ||
result = yield list | ||
trim(list.size) if trim | ||
result | ||
end | ||
end | ||
end | ||
|
||
def reading_across_shards(list:) | ||
across_shards(list:) { |list| yield list } | ||
end | ||
|
||
def writing_shard(normalized_key:, trim: false) | ||
with_shard(shard_for_normalized_key(normalized_key)) do | ||
async_if_required do | ||
result = yield | ||
trim(1) if trim | ||
result | ||
end | ||
end | ||
end | ||
|
||
def reading_shard(normalized_key:) | ||
with_shard(shard_for_normalized_key(normalized_key)) { yield } | ||
end | ||
|
||
private | ||
def with_shard(shard) | ||
if shard | ||
Record.connected_to(shard: shard) { yield } | ||
else | ||
yield | ||
end | ||
end | ||
|
||
def across_shards(list:) | ||
in_shards(list).map do |shard, list| | ||
with_shard(shard) { yield list } | ||
end | ||
end | ||
|
||
def in_shards(list) | ||
if shards.count == 1 | ||
{ shards.first => list } | ||
else | ||
list.group_by { |value| shard_for_normalized_key(value.is_a?(Hash) ? value[:key] : value) } | ||
end | ||
end | ||
|
||
def shard_for_normalized_key(normalized_key) | ||
hash_ring&.get_node(normalized_key) || shards&.first | ||
end | ||
|
||
def hash_ring | ||
@hash_ring ||= shards.count > 0 ? HashRing.new(shards) : nil | ||
end | ||
|
||
def async_if_required | ||
if async_writes | ||
async { yield } | ||
else | ||
yield | ||
end | ||
end | ||
end | ||
end | ||
end |
Oops, something went wrong.