Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReplicationClient #50

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions spec/replication_client_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
require "./spec_helper"
require "../src/replication_client"

module Redis
describe ReplicationClient do
describe ".parse_replication_section" do
it "parses the master's replication section" do
section = <<-SECTION
# Replication\r
role:master\r
connected_slaves:2\r
slave0:ip=10.76.3.39,port=6379,state=stable_sync,lag=0\r
slave1:ip=10.76.1.130,port=6379,state=stable_sync,lag=0\r
master_replid:b08ca5082296cf5b2c1de7207f2bc16bb8da3d80\r

SECTION

data = ReplicationClient::Info::Replication.new(section)

data.role.master?.should eq true
data.connected_replicas.should eq 2
data.replicas.should contain ReplicationClient::Info::Replica.new(
ip: "10.76.3.39",
port: 6379,
state: :stable_sync,
lag: 0.seconds,
)
end

it "parses a replica's replication section" do
section = <<-SECTION
# Replication\r
role:replica\r
master_host:10.76.2.33\r
master_port:9999\r
master_link_status:up\r
master_last_io_seconds_ago:0\r
master_sync_in_progress:0\r

SECTION

data = ReplicationClient::Info::Replication.new(section)

data.role.master?.should eq false
data.role.replica?.should eq true
data.master_host.should eq "10.76.2.33"
data.master_port.should eq 9999
data.master_link_status.should eq "up"
data.master_last_io.not_nil!.should be_within 1.seconds, of: Time.utc
data.master_sync_in_progress?.should eq false
end
end
end
end
6 changes: 4 additions & 2 deletions src/client.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require "db/pool"
require "log"

require "./connection"
require "./log"

module Redis
# The Redis client is the expected entrypoint for this shard. By default, it will connect to localhost:6379, but you can also supply a `URI` to connect to an arbitrary Redis server. SSL, password authentication, and DB selection are all supported.
Expand All @@ -27,7 +29,7 @@ module Redis

# The client holds a pool of connections that expands and contracts as
# needed.
def initialize(uri : URI = URI.parse(ENV.fetch("REDIS_URL", "redis:///")))
def initialize(uri : URI = URI.parse(ENV.fetch("REDIS_URL", "redis:///")), @log = Log)
# defaults as per https://github.com/crystal-lang/crystal-db/blob/v0.11.0/src/db/pool.cr
initial_pool_size = uri.query_params.fetch("initial_pool_size", 1).to_i
max_pool_size = uri.query_params.fetch("max_pool_size", 0).to_i
Expand All @@ -46,7 +48,7 @@ module Redis
retry_attempts: retry_attempts,
retry_delay: retry_delay,
)) do
Connection.new(uri)
Connection.new(uri, log: log)
end
end

Expand Down
57 changes: 1 addition & 56 deletions src/cluster.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# require "./client"
require "./connection"
require "./commands"
require "./read_only_commands"
require "db/pool"
require "set"

Expand Down Expand Up @@ -214,62 +215,6 @@ module Redis
each_master(&.run({"flushdb"}))
end

# Add commands here to route them to read-only replicas.
private READ_ONLY_COMMANDS = %w[
dump
echo
eval_ro
evalsha_ro
exists
expiretime
get
getbit
getrange
hexists
hget
hgetall
hkeys
hlen
hmget
hstrlen
hvals
keys
lcs
lindex
llen
lpos
lrange
mget
pttl
randomkey
scard
sdiff
sinter
sintercard
sismember
smembers
smismember
srandmember
strlen
sunion
ttl
type
xlen
xrange
xrevrange
zcard
zcount
zdiff
zinter
zlexcount
zrandmember
zrange
zrangebylex
zrangebyscore
zrank
zrevrangebylex
].to_set

def run(command full_command)
if full_command.empty?
raise ArgumentError.new("Redis commands must have at least one component")
Expand Down
8 changes: 3 additions & 5 deletions src/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@ require "./pipeline"
require "./value"
require "./transaction"
require "./writer"
require "./log"

module Redis
# The connection wraps the TCP connection to the Redis server.
class Connection
include Commands

# :nodoc:
LOG = ::Log.for(self)

@socket : TCPSocket | OpenSSL::SSL::Socket::Client

# We receive all connection information in the URI.
#
# SSL connections require specifying the `rediss://` scheme.
# Password authentication uses the URI password.
# DB selection uses the URI path.
def initialize(@uri = URI.parse("redis:///"))
def initialize(@uri = URI.parse("redis:///"), @log = Log)
host = uri.host.presence || "localhost"
port = uri.port || 6379
socket = TCPSocket.new(host, port)
Expand Down Expand Up @@ -355,7 +353,7 @@ module Redis
@writer.encode command
flush
result = read
LOG.debug &.emit "redis", command: command[0...2].join(' '), duration_ms: (Time.monotonic - start).total_milliseconds
@log.debug &.emit "redis", command: command[0...2].join(' '), duration_ms: (Time.monotonic - start).total_milliseconds
return result
rescue ex : IO::Error
if retries > 0
Expand Down
6 changes: 6 additions & 0 deletions src/log.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
require "log"

module Redis
# Default Redis log
Log = ::Log.for(self)
end
182 changes: 182 additions & 0 deletions src/read_only_commands.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
module Redis
# Commands in this set are routed to replicas by `Redis::Cluster` and
# `Redis::ReplicationClient`.
#
# You can add additional commands that this shard does not yet know about
# (for example, one provided by a custom Redis module) by using the `<<` method:
#
# ```
# Redis::READ_ONLY_COMMANDS << "mymodule.mycommand"
# ```
READ_ONLY_COMMANDS = %w[
bf.card
bf.debug
bf.exists
bf.info
bf.mexists
bf.scandump
bitcount
bitfield_ro
bitpos
cf.compact
cf.count
cf.debug
cf.exists
cf.info
cf.mexists
cf.scandump
cms.info
cms.query
dbsize
dump
eval_ro
evalsha_ro
exists
expiretime
fcall_ro
ft._aliasaddifnx
ft._aliasdelifx
ft._list
ft.aggregate
ft.aliasadd
ft.aliasdel
ft.aliasupdate
ft.config
ft.cursor
ft.debug
ft.dictadd
ft.dictdel
ft.dictdump
ft.explain
ft.explaincli
ft.get
ft.info
ft.mget
ft.profile
ft.search
ft.spellcheck
ft.sugget
ft.suglen
ft.syndump
ft.tagvals
geodist
geohash
geopos
georadius_ro
georadiusbymember_ro
geosearch
get
getbit
getrange
hexists
hget
hgetall
hkeys
hlen
hmget
hrandfield
hscan
hstrlen
hvals
json.arrindex
json.arrlen
json.debug
json.get
json.mget
json.objkeys
json.objlen
json.resp
json.strlen
json.type
keys
lcs
lindex
llen
lolwut
lpos
lrange
mget
pexpiretime
pfcount
pttl
randomkey
redisgears_2.clusterset
redisgears_2.clustersetfromshard
redisgears_2.forceshardsconnection
redisgears_2.hello
redisgears_2.infocluster
redisgears_2.innercommunication
redisgears_2.networktest
redisgears_2.refreshcluster
scan
scard
sdiff
sinter
sintercard
sismember
smembers
smismember
sort_ro
srandmember
sscan
strlen
substr
sunion
tdigest.byrank
tdigest.byrevrank
tdigest.cdf
tdigest.info
tdigest.max
tdigest.min
tdigest.quantile
tdigest.rank
tdigest.revrank
tdigest.trimmed_mean
timeseries.clusterset
timeseries.clustersetfromshard
timeseries.forceshardsconnection
timeseries.hello
timeseries.infocluster
timeseries.innercommunication
timeseries.networktest
timeseries.refreshcluster
topk.info
topk.list
topk.query
touch
ts.get
ts.info
ts.mget
ts.mrange
ts.mrevrange
ts.queryindex
ts.range
ts.revrange
ttl
type
xlen
xpending
xrange
xread
xrevrange
zcard
zcount
zdiff
zinter
zintercard
zlexcount
zmscore
zrandmember
zrange
zrangebylex
zrangebyscore
zrank
zrevrange
zrevrangebylex
zrevrangebyscore
zrevrank
zscan
zscore
zunion
].to_set
end
Loading