diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 83d7d6d..64c17c6 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -31,7 +31,7 @@ jobs: TARGET_DB: ${{ matrix.database }} steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Setup Ruby and install gems uses: ruby/setup-ruby@v1 with: diff --git a/README.md b/README.md index 47de2f8..8de64ab 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # SolidCache -SolidCache is a database backed ActiveSupport cache store implementation. +SolidCache is a database-backed ActiveSupport cache store implementation. -Using SQL databases backed by solid state storage we can have caches that are much larger and cheaper than traditional memory only Redis or Memcached backed caches. +Using SQL databases backed by SSDs we can have caches that are much larger and cheaper than traditional memory only Redis or Memcached backed caches. Testing on HEY shows that reads and writes are 25%-50% slower than with a Redis cache. However this is not a significant percentage of the overall request time. @@ -9,14 +9,15 @@ If cache misses are expensive (up to 50x the cost of a hit on HEY), then there a ## Usage -To set solid cache as your Rails cache, you should add this to your environment config: +To set SolidCache as your Rails cache, you should add this to your environment config: ```ruby config.cache_store = :solid_cache_store ``` -SolidCache is a FIFO (first in, first out) cache. While this is not as efficient as an LRU cache, this is mitigated by the longer cache lifespans and it provides some advantages: +SolidCache is a FIFO (first in, first out) cache. While this is not as efficient as an LRU cache, this is mitigated by the longer cache lifespans. +A FIFO cache is much easier to manage: 1. We don't need to track when items are read 2. We can estimate and control the cache size by comparing the maximum and minimum IDs. 3. By deleting from one end of the table and adding at the other end we can avoid fragmentation (on MySQL at least). @@ -65,8 +66,8 @@ These can be set in your Rails configuration: Rails.application.configure do config.solid_cache.connects_to = { shards: { - shard1: { writing: :cache_primary_shard1, reading: :cache_primary_shard1 }, - shard2: { writing: :cache_primary_shard1, reading: :cache_primary_shard1 } + shard1: { writing: :cache_primary_shard1 }, + shard2: { writing: :cache_primary_shard1 } } } end @@ -77,11 +78,13 @@ end Solid cache supports these options in addition to the universal `ActiveSupport::Cache::Store` options. - `error_handler` - a Proc to call to handle any `ActiveRecord::ActiveRecordError`s that are raises (default: log errors as warnings) -- `shards` - an Array of the database shards to connect to (shard connects_to must be configured separately via the SolidCache engine config) - `trim_batch_size` - the batch size to use when deleting old records (default: `100`) - `max_age` - the maximum age of entries in the cache (default: `2.weeks.to_i`) - `max_entries` - the maximum number of entries allowed in the cache (default: `2.weeks.to_i`) +- `cluster` - a Hash of options for the cache database cluster, e.g { shards: [:database1, :database2, :database3] } +- `clusters` - and Array of Hashes for separate cache clusters (ignored if `:cluster` is set) +For more information on cache clusters see [Sharding the cache](#sharding-the-cache) ### Cache trimming SolidCache tracks when we write to the cache. For every write it increments a counter by 1.25. Once the counter reaches the `trim_batch_size` it add a task to run on a cache trimming thread. That task will: @@ -92,7 +95,7 @@ SolidCache tracks when we write to the cache. For every write it increments a co Incrementing the counter by 1.25 per write allows us to trim the cache faster than we write to it if we need to. -Only triggering trimming when we write means that the if the cache is idle the background thread is also idle. +Only triggering trimming when we write means that the if the cache is idle, the background thread is also idle. ### Using a dedicated cache database @@ -125,7 +128,7 @@ $ mv db/migrate/*.solid_cache.rb db/cache/migrate Set the engine configuration to point to the new database: ``` Rails.application.configure do - config.solid_cache.connects_to = { database: { writing: :cache, reading: :cache } } + config.solid_cache.connects_to = { default: { writing: :cache } } end ``` @@ -163,15 +166,41 @@ production: Rails.application.configure do config.solid_cache.connects_to = { shards: { - cache_shard1: { writing: :cache_shard1, reading: :cache_shard1 }, - cache_shard2: { writing: :cache_shard2, reading: :cache_shard2 }, - cache_shard3: { writing: :cache_shard3, reading: :cache_shard3 }, + cache_shard1: { writing: :cache_shard1 }, + cache_shard2: { writing: :cache_shard2 }, + cache_shard3: { writing: :cache_shard3 }, } } - config.cache_store = :solid_cache_store, shards: [ :cache_shard1, :cache_shard2, :cache_shard3 ] + config.cache_store = [ :solid_cache_store, cluster: { shards: [ :cache_shard1, :cache_shard2, :cache_shard3 ] } ] end ``` + +### Secondary cache clusters + +You can add secondary cache clusters. Reads will only be sent to the primary cluster (i.e. the first one listed). + +Writes will go to all clusters. The writes to the primary cluster are synchronous, but asyncronous to the secondary clusters. + +To specific multiple clusters you can do: + +```ruby +Rails.application.configure do + config.solid_cache.connects_to = { + shards: { + cache_primary_shard1: { writing: :cache_primary_shard1 }, + cache_primary_shard2: { writing: :cache_primary_shard2 }, + cache_secondary_shard1: { writing: :cache_secondary_shard1 }, + cache_secondary_shard2: { writing: :cache_secondary_shard2 }, + } + } + + primary_cluster = { shards: [ :cache_primary_shard1, :cache_primary_shard2 ] } + secondary_cluster = { shards: [ :cache_primary_shard1, :cache_primary_shard2 ] } + config.cache_store = [ :solid_cache_store, clusters: [ primary_cluster, secondary_cluster ] ] +end +``` + ### Enabling encryption Add this to an initializer: diff --git a/lib/solid_cache/async_execution.rb b/lib/solid_cache/async_execution.rb deleted file mode 100644 index 5772441..0000000 --- a/lib/solid_cache/async_execution.rb +++ /dev/null @@ -1,29 +0,0 @@ -module SolidCache - module AsyncExecution - def initialize(options) - super(options) - @executor = Concurrent::SingleThreadExecutor.new(max_queue: 100, fallback_policy: :discard) - end - - private - def async(&block) - # Need current shard right now, not when block is called - current_shard = Entry.current_shard - @executor << ->() do - wrap_in_rails_executor do - with_shard(current_shard) do - block.call(current_shard) - end - end - end - end - - def wrap_in_rails_executor - if SolidCache.executor - SolidCache.executor.wrap { yield } - else - yield - end - end - end -end diff --git a/lib/solid_cache/cluster.rb b/lib/solid_cache/cluster.rb new file mode 100644 index 0000000..028a97a --- /dev/null +++ b/lib/solid_cache/cluster.rb @@ -0,0 +1,18 @@ + +module SolidCache + class Cluster + require "solid_cache/cluster/hash_ring" + require "solid_cache/cluster/connection_handling" + require "solid_cache/cluster/async_execution" + require "solid_cache/cluster/trimming" + require "solid_cache/cluster/stats" + + include ConnectionHandling, AsyncExecution + include Trimming + include Stats + + def initialize(options = {}) + super(options) + end + end +end diff --git a/lib/solid_cache/cluster/async_execution.rb b/lib/solid_cache/cluster/async_execution.rb new file mode 100644 index 0000000..bb73da2 --- /dev/null +++ b/lib/solid_cache/cluster/async_execution.rb @@ -0,0 +1,31 @@ +module SolidCache + class Cluster + module AsyncExecution + def initialize(options) + super() + @executor = Concurrent::SingleThreadExecutor.new(max_queue: 100, fallback_policy: :discard) + end + + private + def async(&block) + # Need current shard right now, not when block is called + current_shard = Entry.current_shard + @executor << ->() do + wrap_in_rails_executor do + with_shard(current_shard) do + block.call(current_shard) + end + end + end + end + + def wrap_in_rails_executor + if SolidCache.executor + SolidCache.executor.wrap { yield } + else + yield + end + end + end + end +end diff --git a/lib/solid_cache/cluster/connection_handling.rb b/lib/solid_cache/cluster/connection_handling.rb new file mode 100644 index 0000000..99cbb71 --- /dev/null +++ b/lib/solid_cache/cluster/connection_handling.rb @@ -0,0 +1,94 @@ +module SolidCache + class Cluster + module ConnectionHandling + attr_reader :async_writes + + def initialize(options = {}) + super(options) + @shards = options.delete(:shards) + @async_writes = options.delete(:async_writes) + end + + def writing_all_shards + return enum_for(:writing_all_shards) unless block_given? + + shards.each do |shard| + with_shard(shard) do + async_if_required { yield } + end + end + end + + def shards + @shards || SolidCache.all_shard_keys || [nil] + end + + def writing_across_shards(list:, trim: false) + across_shards(list:) do |list| + async_if_required do + result = yield list + trim(list.size) if trim + result + end + end + end + + def reading_across_shards(list:) + across_shards(list:) { |list| yield list } + end + + def writing_shard(normalized_key:, trim: false) + with_shard(shard_for_normalized_key(normalized_key)) do + async_if_required do + result = yield + trim(1) if trim + result + end + end + end + + def reading_shard(normalized_key:) + with_shard(shard_for_normalized_key(normalized_key)) { yield } + end + + private + def with_shard(shard) + if shard + Record.connected_to(shard: shard) { yield } + else + yield + end + end + + def across_shards(list:) + in_shards(list).map do |shard, list| + with_shard(shard) { yield list } + end + end + + def in_shards(list) + if shards.count == 1 + { shards.first => list } + else + list.group_by { |value| shard_for_normalized_key(value.is_a?(Hash) ? value[:key] : value) } + end + end + + def shard_for_normalized_key(normalized_key) + hash_ring&.get_node(normalized_key) || shards&.first + end + + def hash_ring + @hash_ring ||= shards.count > 0 ? HashRing.new(shards) : nil + end + + def async_if_required + if async_writes + async { yield } + else + yield + end + end + end + end +end diff --git a/lib/solid_cache/cluster/hash_ring.rb b/lib/solid_cache/cluster/hash_ring.rb new file mode 100644 index 0000000..2bc63fe --- /dev/null +++ b/lib/solid_cache/cluster/hash_ring.rb @@ -0,0 +1,113 @@ +# Taken from the redis-rb gem (https://github.com/redis/redis-rb) + +# Copyright (c) 2009 Ezra Zygmuntowicz + +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: + +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +# frozen_string_literal: true +require 'zlib' +require 'digest/md5' + +module SolidCache + class Cluster + class HashRing + POINTS_PER_SERVER = 160 # this is the default in libmemcached + + attr_reader :ring, :sorted_keys, :replicas, :nodes + + # nodes is a list of objects that have a proper to_s representation. + # replicas indicates how many virtual points should be used pr. node, + # replicas are required to improve the distribution. + def initialize(nodes = [], replicas = POINTS_PER_SERVER) + @replicas = replicas + @ring = {} + @nodes = [] + @sorted_keys = [] + nodes.each do |node| + add_node(node) + end + end + + # Adds a `node` to the hash ring (including a number of replicas). + def add_node(node) + @nodes << node + @replicas.times do |i| + key = server_hash_for("#{node}:#{i}") + @ring[key] = node + @sorted_keys << key + end + @sorted_keys.sort! + end + + def remove_node(node) + @nodes.reject! { |n| n.id == node } + @replicas.times do |i| + key = server_hash_for("#{node}:#{i}") + @ring.delete(key) + @sorted_keys.reject! { |k| k == key } + end + end + + # get the node in the hash ring for this key + def get_node(key) + hash = hash_for(key) + idx = binary_search(@sorted_keys, hash) + @ring[@sorted_keys[idx]] + end + + def iter_nodes(key) + return [nil, nil] if @ring.empty? + + crc = hash_for(key) + pos = binary_search(@sorted_keys, crc) + @ring.size.times do |n| + yield @ring[@sorted_keys[(pos + n) % @ring.size]] + end + end + + private + + def hash_for(key) + Zlib.crc32(key) + end + + def server_hash_for(key) + ::Digest::MD5.digest(key).unpack1("L>") + end + + # Find the closest index in HashRing with value <= the given value + def binary_search(ary, value) + upper = ary.size + lower = 0 + + while lower < upper + mid = (lower + upper) / 2 + if ary[mid] > value + upper = mid + else + lower = mid + 1 + end + end + + upper - 1 + end + end + end +end diff --git a/lib/solid_cache/cluster/stats.rb b/lib/solid_cache/cluster/stats.rb new file mode 100644 index 0000000..2c5ac23 --- /dev/null +++ b/lib/solid_cache/cluster/stats.rb @@ -0,0 +1,33 @@ +module SolidCache + class Cluster + module Stats + def initialize(options) + super(options) + end + + def stats + stats = { + shards: shards.count, + shards_stats: shards_stats + } + + end + + private + def shards_stats + writing_all_shards.to_h { |shard| [Entry.current_shard, shard_stats] } + end + + def shard_stats + oldest_created_at = Entry.order(:id).pick(:created_at) + + { + max_age: max_age, + oldest_age: oldest_created_at ? Time.now - oldest_created_at : nil, + max_entries: max_entries, + entries: Entry.id_range + } + end + end + end +end diff --git a/lib/solid_cache/cluster/trimming.rb b/lib/solid_cache/cluster/trimming.rb new file mode 100644 index 0000000..fb6c2af --- /dev/null +++ b/lib/solid_cache/cluster/trimming.rb @@ -0,0 +1,55 @@ +require "concurrent/atomic/atomic_fixnum" + +module SolidCache + class Cluster + module Trimming + # For every write that we do, we attempt to delete TRIM_DELETE_MULTIPLIER times as many records. + # This ensures there is downward pressure on the cache size while there is valid data to delete + TRIM_DELETE_MULTIPLIER = 1.25 + + # If deleting X records, we'll select X * TRIM_SELECT_MULTIPLIER and randomly delete X of those + # The selection doesn't lock so it allows more deletion concurrency, but some of the selected records + # might be deleted already. The delete multiplier should compensate for that. + TRIM_SELECT_MULTIPLIER = 3 + + attr_reader :trim_batch_size, :max_age, :max_entries + + def initialize(options = {}) + super(options) + @trim_batch_size = options.delete(:trim_batch_size) || 100 + @max_age = options.delete(:max_age) || 2.weeks.to_i + @max_entries = options.delete(:max_entries) + end + + def trim(write_count) + counter = trim_counters[Entry.current_shard] + counter.increment(write_count * TRIM_DELETE_MULTIPLIER) + value = counter.value + if value > trim_batch_size && counter.compare_and_set(value, value - trim_batch_size) + async { trim_batch } + end + end + + + private + + def trim_batch + candidates = Entry.order(:id).limit(trim_batch_size * TRIM_SELECT_MULTIPLIER).select(:id, :created_at).to_a + candidates.select! { |entry| entry.created_at < max_age.seconds.ago } unless cache_full? + candidates = candidates.sample(trim_batch_size) + + Entry.delete(candidates.map(&:id)) if candidates.any? + end + + def trim_counters + # Pre-fill the first counter to prevent herding and to account + # for discarded counters from the last shutdown + @trim_counters ||= shards.to_h { |shard| [shard, Concurrent::AtomicFixnum.new(rand(trim_batch_size).to_i)] } + end + + def cache_full? + max_entries && max_entries < Entry.id_range + end + end + end +end diff --git a/lib/solid_cache/connection_handling.rb b/lib/solid_cache/connection_handling.rb deleted file mode 100644 index d16e3ae..0000000 --- a/lib/solid_cache/connection_handling.rb +++ /dev/null @@ -1,65 +0,0 @@ -require "solid_cache/hash_ring" - -module SolidCache - module ConnectionHandling - def initialize(options = {}) - super(options) - @shards = options.delete(:shards) - end - - def writing_all_shards - return enum_for(:writing_all_shards) unless block_given? - - shards.each do |shard| - with_shard(shard) { yield } - end - end - - def shards - @shards || SolidCache.all_shard_keys || [nil] - end - - private - def writing_across_shards(list:) - across_shards(list:) { |list| yield list } - end - - def reading_across_shards(list:) - across_shards(list:) { |list| yield list } - end - - def with_shard_for_key(normalized_key:) - with_shard(shard_for_normalized_key(normalized_key)) { yield } - end - - def with_shard(shard) - if shard - Record.connected_to(shard: shard) { yield } - else - yield - end - end - - def across_shards(list:) - in_shards(list).map do |shard, list| - with_shard(shard) { yield list } - end - end - - def in_shards(list) - if shards.count == 1 - { shards.first => list } - else - list.group_by { |value| shard_for_normalized_key(value.is_a?(Hash) ? value[:key] : value) } - end - end - - def shard_for_normalized_key(normalized_key) - hash_ring&.get_node(normalized_key) || shards&.first - end - - def hash_ring - @hash_ring ||= shards.count > 0 ? HashRing.new(shards) : nil - end - end -end diff --git a/lib/solid_cache/hash_ring.rb b/lib/solid_cache/hash_ring.rb deleted file mode 100644 index 54fd04f..0000000 --- a/lib/solid_cache/hash_ring.rb +++ /dev/null @@ -1,111 +0,0 @@ -# Taken from the redis-rb gem (https://github.com/redis/redis-rb) - -# Copyright (c) 2009 Ezra Zygmuntowicz - -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: - -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -# frozen_string_literal: true -require 'zlib' -require 'digest/md5' - -module SolidCache - class HashRing - POINTS_PER_SERVER = 160 # this is the default in libmemcached - - attr_reader :ring, :sorted_keys, :replicas, :nodes - - # nodes is a list of objects that have a proper to_s representation. - # replicas indicates how many virtual points should be used pr. node, - # replicas are required to improve the distribution. - def initialize(nodes = [], replicas = POINTS_PER_SERVER) - @replicas = replicas - @ring = {} - @nodes = [] - @sorted_keys = [] - nodes.each do |node| - add_node(node) - end - end - - # Adds a `node` to the hash ring (including a number of replicas). - def add_node(node) - @nodes << node - @replicas.times do |i| - key = server_hash_for("#{node}:#{i}") - @ring[key] = node - @sorted_keys << key - end - @sorted_keys.sort! - end - - def remove_node(node) - @nodes.reject! { |n| n.id == node } - @replicas.times do |i| - key = server_hash_for("#{node}:#{i}") - @ring.delete(key) - @sorted_keys.reject! { |k| k == key } - end - end - - # get the node in the hash ring for this key - def get_node(key) - hash = hash_for(key) - idx = binary_search(@sorted_keys, hash) - @ring[@sorted_keys[idx]] - end - - def iter_nodes(key) - return [nil, nil] if @ring.empty? - - crc = hash_for(key) - pos = binary_search(@sorted_keys, crc) - @ring.size.times do |n| - yield @ring[@sorted_keys[(pos + n) % @ring.size]] - end - end - - private - - def hash_for(key) - Zlib.crc32(key) - end - - def server_hash_for(key) - ::Digest::MD5.digest(key).unpack1("L>") - end - - # Find the closest index in HashRing with value <= the given value - def binary_search(ary, value) - upper = ary.size - lower = 0 - - while lower < upper - mid = (lower + upper) / 2 - if ary[mid] > value - upper = mid - else - lower = mid + 1 - end - end - - upper - 1 - end - end -end diff --git a/lib/solid_cache/stats.rb b/lib/solid_cache/stats.rb deleted file mode 100644 index 38424dc..0000000 --- a/lib/solid_cache/stats.rb +++ /dev/null @@ -1,27 +0,0 @@ -module SolidCache - module Stats - def stats - stats = { - shards: shards.count, - shards_stats: shards_stats - } - - end - - private - def shards_stats - writing_all_shards.to_h { |shard| [Entry.current_shard, shard_stats] } - end - - def shard_stats - oldest_created_at = SolidCache::Entry.order(:id).pick(:created_at) - - { - max_age: max_age, - oldest_age: oldest_created_at ? Time.now - oldest_created_at : nil, - max_entries: max_entries, - entries: SolidCache::Entry.id_range - } - end - end -end diff --git a/lib/solid_cache/store.rb b/lib/solid_cache/store.rb index 16c2315..3463958 100644 --- a/lib/solid_cache/store.rb +++ b/lib/solid_cache/store.rb @@ -1,14 +1,7 @@ -require "solid_cache/connection_handling" -require "solid_cache/async_execution" -require "solid_cache/trimming" -require "solid_cache/stats" +require "solid_cache/cluster" module SolidCache class Store < ActiveSupport::Cache::Store - include ConnectionHandling, AsyncExecution - include Trimming - include Stats - MAX_KEY_BYTESIZE = 1024 SQL_WILDCARD_CHARS = [ '_', '%' ] @@ -24,12 +17,21 @@ def self.supports_cache_versioning? prepend ActiveSupport::Cache::Strategy::LocalCache - attr_reader :max_key_bytesize + attr_reader :max_key_bytesize, :primary_cluster, :clusters def initialize(options = {}) super(options) @max_key_bytesize = MAX_KEY_BYTESIZE @error_handler = options.delete(:error_handler) || DEFAULT_ERROR_HANDLER + + clusters_options = options.key?(:cluster) ? [options.delete(:cluster)] : options.delete(:clusters) + clusters_options ||= [{}] + + @clusters = clusters_options.map.with_index do |cluster_options, index| + Cluster.new(options.merge(cluster_options).merge(async_writes: index != 0)) + end + + @primary_cluster = clusters.first end def delete_matched(matcher, options = {}) @@ -42,8 +44,8 @@ def delete_matched(matcher, options = {}) matcher = namespace_key(matcher, options) - writing_all_shards do - failsafe :decrement do + writing do + failsafe :delete_matched do Entry.delete_matched(matcher, batch_size: batch_size) end end @@ -53,7 +55,7 @@ def delete_matched(matcher, options = {}) def increment(name, amount = 1, options = nil) options = merged_options(options) key = normalize_key(name, options) - with_shard_for_key(normalized_key: key) do + writing_key(key) do failsafe :increment do Entry.increment(key, amount) end @@ -63,8 +65,8 @@ def increment(name, amount = 1, options = nil) def decrement(name, amount = 1, options = nil) options = merged_options(options) key = normalize_key(name, options) - with_shard_for_key(normalized_key: key) do - failsafe :increment do + writing_key(key) do + failsafe :decrement do Entry.increment(key, -amount) end end @@ -78,8 +80,8 @@ def clear(options = nil) raise NotImplementedError.new("#{self.class.name} does not support clear") end - def shard_for_key(key, options = nil) - shard_for_normalized_key(normalize_key(key, merged_options(options))) + def stats + primary_cluster.stats end private @@ -88,7 +90,7 @@ def read_entry(key, **options) end def read_serialized_entry(key, raw: false, **options) - with_shard_for_key(normalized_key: key) do + primary_cluster.reading_shard(normalized_key: key) do failsafe(:read_entry) do Entry.get(key) end @@ -100,10 +102,9 @@ def write_entry(key, entry, raw: false, **options) payload = serialize_entry(entry, raw: raw, **options) write_serialized_entry(key, payload, raw: raw, **options) - with_shard_for_key(normalized_key: key) do + writing_key(key, trim: true) do failsafe(:write_entry, returning: false) do Entry.set(key, payload) - trim(1) true end end @@ -114,7 +115,7 @@ def write_serialized_entry(key, payload, raw: false, unless_exist: false, expire end def read_serialized_entries(keys) - results = reading_across_shards(list: keys) do |keys| + results = primary_cluster.reading_across_shards(list: keys) do |keys| failsafe(:read_multi_mget, returning: {}) do Entry.get_all(keys) end @@ -151,10 +152,9 @@ def write_multi_entries(entries, expires_in: nil, **options) write_serialized_entry(entries[:key], entries[:value]) end - writing_across_shards(list: serialized_entries) do |serialized_entries| + writing_list(serialized_entries, trim: true) do |serialized_entries| failsafe(:write_multi_entries) do Entry.set_all(serialized_entries) - trim(serialized_entries.count) true end end @@ -162,7 +162,7 @@ def write_multi_entries(entries, expires_in: nil, **options) end def delete_entry(key, **options) - with_shard_for_key(normalized_key: key) do + writing_key(key) do failsafe(:delete_entry, returning: false) do Entry.delete_by_key(key) end @@ -216,5 +216,33 @@ def failsafe(method, returning: nil) @error_handler&.call(method: method, exception: error, returning: returning) returning end + + def writing_key(key, trim: false) + writing_clusters do |cluster| + cluster.writing_shard(normalized_key: key, trim: trim) do + yield + end + end + end + + def writing_list(list, trim: false) + writing_clusters do |cluster| + cluster.writing_across_shards(list: list, trim: trim) do |list| + yield list + end + end + end + + def writing + writing_clusters do |cluster| + cluster.writing_all_shards do + yield + end + end + end + + def writing_clusters + clusters.map { |cluster| yield cluster }.first + end end end diff --git a/lib/solid_cache/trimming.rb b/lib/solid_cache/trimming.rb deleted file mode 100644 index ff2548a..0000000 --- a/lib/solid_cache/trimming.rb +++ /dev/null @@ -1,51 +0,0 @@ -require "concurrent/atomic/atomic_fixnum" - -module SolidCache - module Trimming - # For every write that we do, we attempt to delete TRIM_DELETE_MULTIPLIER times as many records. - # This ensures there is downward pressure on the cache size while there is valid data to delete - TRIM_DELETE_MULTIPLIER = 1.25 - - # If deleting X records, we'll select X * TRIM_SELECT_MULTIPLIER and randomly delete X of those - # The selection doesn't lock so it allows more deletion concurrency, but some of the selected records - # might be deleted already. The delete multiplier should compensate for that. - TRIM_SELECT_MULTIPLIER = 3 - - attr_reader :trim_batch_size, :max_age, :max_entries - - def initialize(options = {}) - super(options) - @trim_batch_size = options.delete(:trim_batch_size) || 100 - @max_age = options.delete(:max_age) || 2.weeks.to_i - @max_entries = options.delete(:max_entries) - end - - private - def trim(write_count) - counter = trim_counters[Entry.current_shard] - counter.increment(write_count * TRIM_DELETE_MULTIPLIER) - value = counter.value - if value > trim_batch_size && counter.compare_and_set(value, value - trim_batch_size) - async { trim_batch } - end - end - - def trim_batch - candidates = Entry.order(:id).limit(trim_batch_size * TRIM_SELECT_MULTIPLIER).select(:id, :created_at).to_a - candidates.select! { |entry| entry.created_at < max_age.seconds.ago } unless cache_full? - candidates = candidates.sample(trim_batch_size) - - Entry.delete(candidates.map(&:id)) if candidates.any? - end - - def trim_counters - # Pre-fill the first counter to prevent herding and to account - # for discarded counters from the last shutdown - @trim_counters ||= shards.to_h { |shard| [shard, Concurrent::AtomicFixnum.new(rand(trim_batch_size))] } - end - - def cache_full? - max_entries && max_entries < Entry.id_range - end - end -end diff --git a/test/dummy/config/application.rb b/test/dummy/config/application.rb index a0d4ce5..edb091a 100644 --- a/test/dummy/config/application.rb +++ b/test/dummy/config/application.rb @@ -27,7 +27,11 @@ class Application < Rails::Application config.solid_cache.connects_to = { shards: { default: { writing: :primary, reading: :primary_replica }, - shard_one: { writing: :primary_shard_one, reading: :primary_shard_one_replica } + default2: { writing: :primary_shard_one, reading: :primary_shard_one_replica }, + primary_shard_one: { writing: :primary_shard_one }, + primary_shard_two: { writing: :primary_shard_two }, + secondary_shard_one: { writing: :secondary_shard_one }, + secondary_shard_two: { writing: :secondary_shard_two } } } end diff --git a/test/dummy/config/database.yml b/test/dummy/config/database.yml index 79bb1e1..a933525 100644 --- a/test/dummy/config/database.yml +++ b/test/dummy/config/database.yml @@ -1,3 +1,13 @@ +<% + def database(name) + if ENV["TARGET_DB"]=="mysql" || ENV["TARGET_DB"]=="postgres" + name + else + "db/#{name}.sqlite3" + end + end +%> + <% if ENV["TARGET_DB"]=="mysql" %> # MySQL @@ -8,39 +18,6 @@ default: &default host: "127.0.0.1" port: 33060 -development: - primary: - <<: *default - database: database_cache_development - primary_replica: - <<: *default - database: database_cache_development - replica: true - primary_shard_one: - <<: *default - database: database_cache1_development - primary_shard_one_replica: - <<: *default - database: database_cache1_development - replica: true - -test: - primary: - <<: *default - database: database_cache_test - primary_replica: - <<: *default - database: database_cache_test - replica: true - primary_shard_one: - <<: *default - database: database_cache1_test - primary_shard_one_replica: - <<: *default - database: database_cache1_test - replica: true - - <% elsif ENV["TARGET_DB"]=="postgres" %> # Postgres @@ -52,82 +29,61 @@ default: &default host: "127.0.0.1" port: 55432 -development: - primary: - <<: *default - database: database_cache_development - primary_replica: - <<: *default - database: database_cache_development - replica: true - primary_shard_one: - <<: *default - database: database_cache1_development - primary_shard_one_replica: - <<: *default - database: database_cache1_development - replica: true - -test: - primary: - <<: *default - database: database_cache_test - primary_replica: - <<: *default - database: database_cache_test - replica: true - primary_shard_one: - <<: *default - database: database_cache1_test - primary_shard_one_replica: - <<: *default - database: database_cache1_test - replica: true - <% else %> -# SQLite. Versions 3.8.0 and up are supported. -# gem install sqlite3 -# -# Ensure the SQLite 3 gem is defined in your Gemfile -# gem "sqlite3" -# + default: &default adapter: sqlite3 pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 5 } %> timeout: 5000 +<% end %> + development: primary: <<: *default - database: db/development.sqlite3 + database: <%= database("database_cache_development") %> primary_replica: <<: *default - database: db/development.sqlite3 + database: <%= database("database_cache_development") %> replica: true primary_shard_one: <<: *default - database: db/development1.sqlite3 + database: <%= database("database_cache1_development") %> primary_shard_one_replica: <<: *default - database: db/development1.sqlite3 + database: <%= database("database_cache1_development") %> replica: true + primary_shard_two: + <<: *default + database: <%= database("database_cache2_development") %> + secondary_shard_one: + <<: *default + database: <%= database("database_cache1s_development") %> + secondary_shard_two: + <<: *default + database: <%= database("database_cache2s_development") %> -# Warning: The database defined as "test" will be erased and -# re-generated from your development database when you run "rake". -# Do not set this db to the same as development or production. test: primary: <<: *default - database: db/test.sqlite3 + database: <%= database("database_cache_development") %> primary_replica: <<: *default - database: db/test.sqlite3 + database: <%= database("database_cache_development") %> replica: true primary_shard_one: <<: *default - database: db/test1.sqlite3 + database: <%= database("database_cache1_development") %> primary_shard_one_replica: <<: *default - database: db/test1.sqlite3 + database: <%= database("database_cache1_development") %> replica: true -<% end %> + primary_shard_two: + <<: *default + database: <%= database("database_cache2_development") %> + secondary_shard_one: + <<: *default + database: <%= database("database_cache1s_development") %> + secondary_shard_two: + <<: *default + database: <%= database("database_cache2s_development") %> diff --git a/test/dummy/db/primary_shard_two_schema.rb b/test/dummy/db/primary_shard_two_schema.rb new file mode 100644 index 0000000..b14659b --- /dev/null +++ b/test/dummy/db/primary_shard_two_schema.rb @@ -0,0 +1,21 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.0].define(version: 2023_07_24_121448) do + create_table "solid_cache_entries", force: :cascade do |t| + t.binary "key", limit: 1024, null: false + t.binary "value", limit: 536870912, null: false + t.datetime "created_at", null: false + t.index ["key"], name: "index_solid_cache_entries_on_key", unique: true + end + +end diff --git a/test/dummy/db/secondary_shard_one_schema.rb b/test/dummy/db/secondary_shard_one_schema.rb new file mode 100644 index 0000000..b14659b --- /dev/null +++ b/test/dummy/db/secondary_shard_one_schema.rb @@ -0,0 +1,21 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.0].define(version: 2023_07_24_121448) do + create_table "solid_cache_entries", force: :cascade do |t| + t.binary "key", limit: 1024, null: false + t.binary "value", limit: 536870912, null: false + t.datetime "created_at", null: false + t.index ["key"], name: "index_solid_cache_entries_on_key", unique: true + end + +end diff --git a/test/dummy/db/secondary_shard_two_schema.rb b/test/dummy/db/secondary_shard_two_schema.rb new file mode 100644 index 0000000..b14659b --- /dev/null +++ b/test/dummy/db/secondary_shard_two_schema.rb @@ -0,0 +1,21 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.0].define(version: 2023_07_24_121448) do + create_table "solid_cache_entries", force: :cascade do |t| + t.binary "key", limit: 1024, null: false + t.binary "value", limit: 536870912, null: false + t.datetime "created_at", null: false + t.index ["key"], name: "index_solid_cache_entries_on_key", unique: true + end + +end diff --git a/test/models/solid_cache/record_test.rb b/test/models/solid_cache/record_test.rb index a5d25c3..ab3950a 100644 --- a/test/models/solid_cache/record_test.rb +++ b/test/models/solid_cache/record_test.rb @@ -4,7 +4,7 @@ module SolidCache class RecordTest < ActiveSupport::TestCase test "set and get cache entries" do shards = SolidCache::Record.each_shard.map { SolidCache::Record.current_shard } - assert_equal [ :default, :shard_one ], shards + assert_equal [ :default, :default2, :primary_shard_one, :primary_shard_two, :secondary_shard_one, :secondary_shard_two ], shards end end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 0856266..4c4aa65 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -17,11 +17,13 @@ end def lookup_store(options = {}) - ActiveSupport::Cache.lookup_store(:solid_cache_store, { namespace: @namespace }.merge(options)) + store_options = { namespace: @namespace }.merge(options) + store_options.merge!(cluster: { shards: [:default, :default2] }) unless store_options.key?(:cluster) || store_options.key?(:clusters) + ActiveSupport::Cache.lookup_store(:solid_cache_store, store_options) end def send_entries_back_in_time(distance) - @cache.writing_all_shards do + @cache.primary_cluster.writing_all_shards do SolidCache::Entry.all.each do |entry| entry.update_columns(created_at: entry.created_at - distance) end diff --git a/test/unit/async_executor_test.rb b/test/unit/async_executor_test.rb index 6303a48..7bd97db 100644 --- a/test/unit/async_executor_test.rb +++ b/test/unit/async_executor_test.rb @@ -15,7 +15,7 @@ def test_async_errors_are_reported error_subscriber = ErrorSubscriber.new Rails.error.subscribe(error_subscriber) - @cache.send(:async) do + @cache.primary_cluster.send(:async) do raise "Boom!" end diff --git a/test/unit/cluster_test.rb b/test/unit/cluster_test.rb new file mode 100644 index 0000000..18ade46 --- /dev/null +++ b/test/unit/cluster_test.rb @@ -0,0 +1,122 @@ +require "test_helper" + +class ClusterTest < ActiveSupport::TestCase + setup do + @cache = nil + @namespace = "test-#{SecureRandom.hex}" + primary_cluster = { shards: [:primary_shard_one, :primary_shard_two] } + secondary_cluster = { shards: [:secondary_shard_one, :secondary_shard_two] } + + @cache = lookup_store(expires_in: 60, clusters: [ primary_cluster, secondary_cluster ]) + @primary_cache = lookup_store(expires_in: 60, cluster: primary_cluster) + @secondary_cache = lookup_store(expires_in: 60, cluster: secondary_cluster) + end + + test "writes to both clusters" do + @cache.write("foo", 1) + sleep 0.1 + assert_equal 1, @cache.read("foo") + assert_equal 1, @primary_cache.read("foo") + assert_equal 1, @secondary_cache.read("foo") + end + + test "reads from primary cluster" do + @cache.write("foo", 1) + sleep 0.1 + assert_equal 1, @cache.read("foo") + + @secondary_cache.delete("foo") + assert_equal 1, @cache.read("foo") + + @primary_cache.delete("foo") + assert_nil @cache.read("foo") + end + + test "fetch writes to both clusters" do + @cache.fetch("foo") { 1 } + sleep 0.1 + + assert_equal 1, @cache.read("foo") + assert_equal 1, @primary_cache.read("foo") + assert_equal 1, @secondary_cache.read("foo") + end + + test "fetch reads from primary clusters" do + @cache.fetch("foo") { 1 } + sleep 0.1 + assert_equal 1, @cache.read("foo") + + @primary_cache.delete("foo") + @cache.fetch("foo") { 2 } + + assert_equal 2, @cache.read("foo") + assert_equal 2, @primary_cache.read("foo") + assert_equal 2, @secondary_cache.read("foo") + + @secondary_cache.delete("foo") + assert_equal 2, @cache.fetch("foo") { 3 } + + assert_equal 2, @primary_cache.read("foo") + assert_nil @secondary_cache.read("foo") + end + + test "deletes from both cluster" do + @cache.write("foo", 1) + sleep 0.1 + assert_equal 1, @cache.read("foo") + + @cache.delete("foo") + sleep 0.1 + + assert_nil @cache.read("foo") + assert_nil @primary_cache.read("foo") + assert_nil @secondary_cache.read("foo") + end + + test "multi_writes to both clusters" do + values = { "foo" => "bar", "egg" => "spam" } + @cache.write_multi(values) + sleep 0.1 + assert_equal values, @cache.read_multi("foo", "egg") + assert_equal values, @primary_cache.read_multi("foo", "egg") + assert_equal values, @secondary_cache.read_multi("foo", "egg") + end + + test "delete_matched deletes from both caches" do + values = { "foo" => "bar", "baz" => "zab", "bab" => "dab" } + @cache.write_multi(values) + sleep 0.1 + + @cache.delete_matched("ba%") + sleep 0.1 + + assert_equal({ "foo" => "bar" }, @cache.read_multi(*values.keys)) + assert_equal({ "foo" => "bar" }, @primary_cache.read_multi(*values.keys)) + assert_equal({ "foo" => "bar" }, @secondary_cache.read_multi(*values.keys)) + end + + test "increment and decrement hit both clusters" do + @cache.write("foo", 1, raw: true) + sleep 0.1 + + assert_equal 1, @cache.read("foo", raw: true).to_i + assert_equal 1, @primary_cache.read("foo", raw: true).to_i + assert_equal 1, @secondary_cache.read("foo", raw: true).to_i + + @cache.increment("foo") + sleep 0.1 + + assert_equal 2, @cache.read("foo", raw: true).to_i + assert_equal 2, @primary_cache.read("foo", raw: true).to_i + assert_equal 2, @secondary_cache.read("foo", raw: true).to_i + + @secondary_cache.write("foo", 4, raw: true) + + @cache.decrement("foo") + sleep 0.1 + + assert_equal 1, @cache.read("foo", raw: true).to_i + assert_equal 1, @primary_cache.read("foo", raw: true).to_i + assert_equal 3, @secondary_cache.read("foo", raw: true).to_i + end +end diff --git a/test/unit/delete_matched_test.rb b/test/unit/delete_matched_test.rb index b3cd754..c055dd3 100644 --- a/test/unit/delete_matched_test.rb +++ b/test/unit/delete_matched_test.rb @@ -1,3 +1,5 @@ +require "test_helper" + class DeleteMatchedTest < ActiveSupport::TestCase setup do @cache = nil diff --git a/test/unit/stats_test.rb b/test/unit/stats_test.rb index 6dc4091..e5f1c55 100644 --- a/test/unit/stats_test.rb +++ b/test/unit/stats_test.rb @@ -9,13 +9,13 @@ class SolidCache::StatsTest < ActiveSupport::TestCase end def test_stats - @cache = lookup_store(trim_batch_size: 2, max_age: 2.weeks.to_i, max_entries: 1000, shards: [:default, :shard_one]) + @cache = lookup_store(trim_batch_size: 2, max_age: 2.weeks.to_i, max_entries: 1000, shards: [:default, :default2]) expected = { shards: 2, shards_stats: { default: { max_age: 2.weeks.to_i, oldest_age: nil, max_entries: 1000, entries: 0 }, - shard_one: { max_age: 2.weeks.to_i, oldest_age: nil, max_entries: 1000, entries: 0 } + default2: { max_age: 2.weeks.to_i, oldest_age: nil, max_entries: 1000, entries: 0 } } } @@ -23,7 +23,7 @@ def test_stats end def test_stats_with_entries - @cache = lookup_store(trim_batch_size: 2, max_age: 2.weeks.to_i, max_entries: 1000, shards: [:default]) + @cache = lookup_store(trim_batch_size: 2, max_age: 2.weeks.to_i, max_entries: 1000, cluster: { shards: [:default] }) expected_empty = { shards: 1, shards_stats: { default: { max_age: 2.weeks.to_i, oldest_age: nil, max_entries: 1000, entries: 0 } } } diff --git a/test/unit/trimming_test.rb b/test/unit/trimming_test.rb index 9095167..98dda4d 100644 --- a/test/unit/trimming_test.rb +++ b/test/unit/trimming_test.rb @@ -9,7 +9,7 @@ class SolidCache::TrimmingTest < ActiveSupport::TestCase end def test_trims_old_records - @cache = lookup_store(trim_batch_size: 2, max_age: 2.weeks, shards: [:default]) + @cache = lookup_store(trim_batch_size: 2, max_age: 2.weeks, cluster: { shards: [:default] }) @cache.write("foo", 1) @cache.write("bar", 2) assert_equal 1, @cache.read("foo") @@ -28,7 +28,7 @@ def test_trims_old_records end def test_trims_records_when_the_cache_is_full - @cache = lookup_store(trim_batch_size: 2, shards: [:default], max_age: 2.weeks, max_entries: 2) + @cache = lookup_store(trim_batch_size: 2, cluster: { shards: [:default] }, max_age: 2.weeks, max_entries: 2) @cache.write("foo", 1) @cache.write("bar", 2) @@ -45,7 +45,7 @@ def test_trims_records_when_the_cache_is_full def test_trims_old_records_multiple_shards @cache = lookup_store(trim_batch_size: 2) - default_shard_keys, shard_one_keys = 20.times.map { |i| "key#{i}" }.partition { |key| @cache.shard_for_key(key) == :default } + default_shard_keys, shard_one_keys = 20.times.map { |i| "key#{i}" }.partition { |key| @cache.primary_cluster.send(:shard_for_normalized_key, @cache.send(:normalize_key, key, {})) == :default } @cache.write(default_shard_keys[0], 1) @cache.write(default_shard_keys[1], 2) @@ -76,7 +76,7 @@ def test_trims_old_records_multiple_shards assert_equal 7, @cache.read(shard_one_keys[2]) assert_equal 8, @cache.read(shard_one_keys[3]) - [:default, :shard_one].each do |shard| + [:default, :default2].each do |shard| SolidCache::Record.connected_to(shard: shard) do assert_equal 2, SolidCache::Entry.count end