Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redis connection health checks and restarts #55

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Cable.configure do |settings|
settings.pool_redis_publish = false # set to `true` to enable a pooled connection on publish
settings.redis_pool_size = 5
settings.redis_pool_timeout = 5.0
settings.redis_ping_interval = 15.seconds
settings.restart_error_allowance = 20
end
```

Expand Down Expand Up @@ -197,6 +199,64 @@ class ChatChannel < ApplicationCable::Channel
end
```

## Redis
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding this section on README is amazing, thanks for such effort 😎 👍


Redis has complexities that need to be considered;

1. Redis Pub/Sub works really well until you lose the connection...
2. Redis connections can go stale without activity.
3. Redis DB's have a buffer related to the message sizes called [Output Buffer Limits](https://redis.io/docs/reference/clients/#output-buffer-limits). Exceeding this buffer will not disconnect the connection. It just yields it dead. You cannot know about this except by monitoring logs/metrics.

Here are some ways this shard can help with this.

### Restarting the server

When the first connection is made, the cable server spawns a single pub/sub connection for all subscriptions.
If the connection dies at any point, the server will continue to throw errors unless someone manually restarts the server...

The cable server provides an automated failure rate monitoring/restart function to automate the restart process.

When the server encounters (n) errors are trying to connect to the Redis connection, it restarts the server.
The error rate allowance avoids a vicious cycle > of clients attempting to connect vs server restarts while Redis is down.
Generally, if the Redis connection is down, you'll exceed this error allowance quickly. So you may encounter severe back-to-back restarts if Redis is down for a substantial time.

> NOTE: The automated restart process will also kill all the current client WS connections.
> However, this trade-off allows a fault-tolerant system vs leaving a dead Redis connection hanging around with no pub/sub activity.

You can change this setting. However, we advise not going below 20.

```crystal
Cable.configure do |settings|
settings.restart_error_allowance = 20 # default is 20.
settings.restart_error_allowance = 0 # Use 0 to disable this
end
```

> NOTE: An error log `Cable.restart` will be invoked whenever a restart happens. We highly advise you to monitor these logs.

### Maintain Redis connection activity

When the first connection is made, the cable server starts a Redis PING/PONG task, which runs every 15 seconds. This helps to keep the Redis connection from going stale.

You can change this setting. However, we advise not going over 60 seconds.

```crystal
Cable.configure do |settings|
settings.redis_ping_interval = 15.seconds # default is 15.
end
```

### Increase your Redis [Output Buffer Limits](https://redis.io/docs/reference/clients/#output-buffer-limits)

> Technically, this shard cannot help with this.

Exceeding this buffer should be avoided to ensure a stable pub/sub connection.

Options;

1. Double or triple this setting on your Redis DB. 32Mb is usually the default.
2. Ensure you truncate the message sizes client side.

Check below on the JavaScript section how to communicate with the Cable backend

## JavaScript
Expand Down
134 changes: 134 additions & 0 deletions spec/cable/handler_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,140 @@ describe Cable::Handler do
io_with_context = create_ws_request_and_return_io_and_context(handler, request)[0]
io_with_context.to_s.should contain("404 Not Found")
end

it "restarts the server if too many errors" do
address_chan = start_server
listen_address = address_chan.receive

# no errors
Cable.server.errors.should eq(0)

# connect
Cable.server.connections.size.should eq(0)
ws2 = HTTP::WebSocket.new("ws://#{listen_address}/updates?test_token=ws2")
Cable.server.connections.size.should eq(1)
ws3 = HTTP::WebSocket.new("ws://#{listen_address}/updates?test_token=ws3")
Cable.server.connections.size.should eq(2)
ws4 = HTTP::WebSocket.new("ws://#{listen_address}/updates?test_token=ws4")
Cable.server.connections.size.should eq(3)
ws5 = HTTP::WebSocket.new("ws://#{listen_address}/updates?test_token=ws5")
Cable.server.connections.size.should eq(4)

connections = Cable.server.connections.keys
connections.any? { |c| c.starts_with?("ws2") }.should eq(true)
connections.any? { |c| c.starts_with?("ws3") }.should eq(true)
connections.any? { |c| c.starts_with?("ws4") }.should eq(true)
connections.any? { |c| c.starts_with?("ws5") }.should eq(true)

messages = [
{type: "welcome"}.to_json,
{type: "confirm_subscription", identifier: {channel: "ChatChannel", room: "1"}.to_json}.to_json,
{identifier: {channel: "ChatChannel", room: "1"}.to_json, message: {message: "test", current_user: "ws2"}}.to_json,
]
seq = 0
ping_seq = 0
ws2.on_message do |str|
if str.match(/\{"type":"ping","message":[0-9]{8,12}\}/) && ping_seq < 2
ping_seq += 1
next
end
str.should eq(messages[seq])
seq += 1
ws2.close if seq >= messages.size
end
# subscribe
ws2.send({"command" => "subscribe", "identifier" => {channel: "ChatChannel", room: "1"}.to_json}.to_json)

sleep 0.2

# send message
ws2.send({"command" => "message", "identifier" => {channel: "ChatChannel", room: "1"}.to_json, "data" => {message: "test"}.to_json}.to_json)
# raise error
ws2.send({"command" => "message", "identifier" => {channel: "ChatChannel", room: "1"}.to_json, "data" => {message: "raise"}.to_json}.to_json)

ws2.run

# 1 error
Cable.server.errors.should eq(1)

# connection 1 will be disconnected due to error
Cable.server.connections.size.should eq(3)
connections = Cable.server.connections.keys
connections.any? { |c| c.starts_with?("ws2") }.should eq(false)

messages = [
{type: "welcome"}.to_json,
{type: "confirm_subscription", identifier: {channel: "ChatChannel", room: "1"}.to_json}.to_json,
{identifier: {channel: "ChatChannel", room: "1"}.to_json, message: {message: "test", current_user: "ws3"}}.to_json,
]
seq = 0
ping_seq = 0
ws3.on_message do |str|
if str.match(/\{"type":"ping","message":[0-9]{8,12}\}/) && ping_seq < 2
ping_seq += 1
next
end
str.should eq(messages[seq])
seq += 1
ws3.close if seq >= messages.size
end
# subscribe
ws3.send({"command" => "subscribe", "identifier" => {channel: "ChatChannel", room: "1"}.to_json}.to_json)

sleep 0.2

# send message
ws3.send({"command" => "message", "identifier" => {channel: "ChatChannel", room: "1"}.to_json, "data" => {message: "test"}.to_json}.to_json)
# raise error
ws3.send({"command" => "message", "identifier" => {channel: "ChatChannel", room: "1"}.to_json, "data" => {message: "raise"}.to_json}.to_json)

ws3.run

# 2 errors
Cable.server.errors.should eq(2)

# connection 1 will be disconnected due to error
Cable.server.connections.size.should eq(2)
connections = Cable.server.connections.keys
connections.any? { |c| c.starts_with?("ws3") }.should eq(false)

messages = [
{type: "welcome"}.to_json,
{type: "confirm_subscription", identifier: {channel: "ChatChannel", room: "1"}.to_json}.to_json,
{identifier: {channel: "ChatChannel", room: "1"}.to_json, message: {message: "test", current_user: "ws3"}}.to_json,
]
seq = 0
ping_seq = 0
ws4.on_message do |str|
if str.match(/\{"type":"ping","message":[0-9]{8,12}\}/) && ping_seq < 2
ping_seq += 1
next
end
str.should eq(messages[seq])
seq += 1
ws4.close if seq >= messages.size
end
# subscribe
ws4.send({"command" => "subscribe", "identifier" => {channel: "ChatChannel", room: "1"}.to_json}.to_json)

sleep 0.2

# send message
ws4.send({"command" => "message", "identifier" => {channel: "ChatChannel", room: "1"}.to_json, "data" => {message: "test"}.to_json}.to_json)
# raise error
ws4.send({"command" => "message", "identifier" => {channel: "ChatChannel", room: "1"}.to_json, "data" => {message: "raise"}.to_json}.to_json)

ws4.run

sleep 0.2

# we should have 1 connectoon ws5 open
# but since the server restarted due to volume of errors
# all connections will be closed
# errors will be reset
Cable.server.errors.should eq(0)
Cable.server.connections.size.should eq(0)
end
end
end

Expand Down
2 changes: 2 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ require "./support/channels/*"
Cable.configure do |settings|
settings.route = "/updates"
settings.token = "test_token"
settings.redis_ping_interval = 2.seconds
settings.restart_error_allowance = 2
end

Spec.before_each do
Expand Down
1 change: 1 addition & 0 deletions spec/support/channels/chat_channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ChatChannel < ApplicationCable::Channel
broadcast_message["message"] = message["message"].to_s
end
broadcast_message["current_user"] = connection.identifier
raise IO::Error.new("Invalid message") if broadcast_message["message"] == "raise"
ChatChannel.broadcast_to("chat_#{params["room"]}", broadcast_message)
end

Expand Down
2 changes: 2 additions & 0 deletions src/cable.cr
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ module Cable
setting pool_redis_publish : Bool = false
setting redis_pool_size : Int32 = 5
setting redis_pool_timeout : Float64 = 5.0
setting redis_ping_interval : Time::Span = 15.seconds
setting restart_error_allowance : Int32 = 20
end

def self.message(event : Symbol)
Expand Down
3 changes: 3 additions & 0 deletions src/cable/handler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ module Cable
# handle all other exceptions
socket.close(HTTP::WebSocket::CloseCode::InternalServerError, "Internal Server Error")
Cable.server.remove_connection(connection_id)
# handle restart
Cable.server.count_error!
Cable.restart if Cable.server.restart?
Cable::Logger.error { "Exception: #{e.message}" }
end
end
Expand Down
34 changes: 34 additions & 0 deletions src/cable/redis_pinger.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require "tasker"

module Cable
class RedisPinger
private getter task : Tasker::Task

def initialize(@server : Cable::Server)
@task = Tasker.every(Cable.settings.redis_ping_interval) do
check_redis_subscribe
check_redis_publish
rescue e
Cable::Logger.error { "Cable::RedisPinger Exception: #{e.class.name} -> #{e.message}" }
# Restart cable if something happened
Cable.server.count_error!
Cable.restart if Cable.server.restart?
end
end

def stop
@task.cancel
end

def check_redis_subscribe
Cable.server.publish("_internal", "ping")
end

def check_redis_publish
request = Redis::Request.new
request << "ping"
result = @server.redis_subscribe._connection.send(request)
Cable::Logger.debug { "Cable::RedisPinger.check_redis_publish -> #{result}" }
end
end
end
35 changes: 29 additions & 6 deletions src/cable/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Cable
def self.restart
if current_server = @@server
current_server.shutdown
Cable::Logger.error { "Cable.restart" }
end
@@server = Server.new
end
Expand All @@ -32,8 +33,12 @@ module Cable
end
end

getter errors = 0
getter redis_subscribe = Redis.new(url: Cable.settings.url)
getter fiber_channel = ::Channel({String, String}).new
getter pinger : Cable::RedisPinger do
Cable::RedisPinger.new(self)
end

@channels = {} of String => Channels
@channel_mutex = Mutex.new
Expand Down Expand Up @@ -127,16 +132,32 @@ module Cable
end

def shutdown
request = Redis::Request.new
request << "unsubscribe"
redis_subscribe._connection.send(request)
redis_subscribe.close
redis_publish.close
begin
request = Redis::Request.new
request << "unsubscribe"
redis_subscribe._connection.send(request)
redis_subscribe.close
redis_publish.close
rescue e : IO::Error
# the @writer IO is closed already
Cable::Logger.debug { "Cable::Server#shutdown Connection to redis was severed: #{e.message}" }
end
pinger.stop
connections.each do |k, v|
v.close
end
end

def restart?
errors > Cable.settings.restart_error_allowance
end

def count_error!
@channel_mutex.synchronize do
@errors += 1
end
end

private def process_subscribed_messages
server = self
spawn(name: "Cable::Server - process_subscribed_messages") do
Expand All @@ -152,7 +173,9 @@ module Cable
spawn(name: "Cable::Server - subscribe") do
redis_subscribe.subscribe("_internal") do |on|
on.message do |channel, message|
if channel == "_internal" && message == "debug"
if channel == "_internal" && message == "ping"
Cable::Logger.debug { "Cable::Server#subscribe -> PONG" }
elsif channel == "_internal" && message == "debug"
debug
else
fiber_channel.send({channel, message})
Expand Down