Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swaps out the redis implementation for a different shard #48

Closed
wants to merge 11 commits into from
4 changes: 2 additions & 2 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ dependencies:
github: spider-gazelle/tasker
version: ~> 2.0
redis:
github: stefanwille/crystal-redis
version: ~> 2.8.0
github: jgaskins/redis
branch: master
habitat:
github: luckyframework/habitat

Expand Down
10 changes: 5 additions & 5 deletions spec/cable/connection_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,19 @@ describe Cable::Connection do

describe ".identified_by" do
it "uses the right identifier name for it" do
connect do |connection, socket|
connect do |connection, _socket|
connection.identifier.should eq("98")
end

connect(connection_class: ConnectionWithDifferentIndentifierTest) do |connection, socket|
connect(connection_class: ConnectionWithDifferentIndentifierTest) do |connection, _socket|
connection.identifier.should eq("98")
end
end
end

describe ".owned_by" do
it "uses the right identifier name for it" do
connect do |connection, socket|
connect do |connection, _socket|
connection.current_user.not_nil!.email.should eq("[email protected]")
connection.organization.not_nil!.name.should eq("Acme Inc.")
end
Expand Down Expand Up @@ -478,7 +478,7 @@ def builds_request(token : String)
"Sec-WebSocket-Protocol" => "actioncable-v1-json, actioncable-unsupported",
"Sec-WebSocket-Version" => "13",
}
request = HTTP::Request.new("GET", "#{Cable.settings.route}?test_token=#{token}", headers)
HTTP::Request.new("GET", "#{Cable.settings.route}?test_token=#{token}", headers)
end

def builds_request(token : Nil)
Expand All @@ -489,7 +489,7 @@ def builds_request(token : Nil)
"Sec-WebSocket-Protocol" => "actioncable-v1-json, actioncable-unsupported",
"Sec-WebSocket-Version" => "13",
}
request = HTTP::Request.new("GET", "#{Cable.settings.route}", headers)
HTTP::Request.new("GET", "#{Cable.settings.route}", headers)
end

private class DummySocket < HTTP::WebSocket
Expand Down
26 changes: 23 additions & 3 deletions spec/cable/handler_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,32 @@ describe Cable::Handler do
seq += 1
ws2.close if seq >= messages.size
end
ws2.on_close do |code, _reason|
code.should eq(HTTP::WebSocket::CloseCode::AbnormalClosure)
end
ws2.send({"command" => "subscribe", "identifier" => {channel: "ChatChannel", room: "1"}.to_json}.to_json)

ws2.run

# should be 1 connection open
Cable.server.connections.size.should eq(1)
end

# TODO: Error writing to socket: Broken pipe (IO::Error)
# it "rejected" do
# address_chan = start_server
# listen_address = address_chan.receive

# ws2 = HTTP::WebSocket.new("ws://#{listen_address}/updates?test_token=reject")
# ws2.on_close do |code, reason|
# code.should eq(HTTP::WebSocket::CloseCode::NormalClosure)
# reason.should eq("Farewell")
# end
# ws2.run

# # should be zero connections open
# Cable.server.connections.size.should eq(0)
# end
end

describe "receive message from client" do
Expand Down Expand Up @@ -104,7 +126,6 @@ describe Cable::Handler do

describe "server broadcast to channels" do
it "sends and clients receives the message" do
Cable.restart
address_chan = start_server
listen_address = address_chan.receive

Expand Down Expand Up @@ -216,8 +237,7 @@ private def start_server

spawn do
# Make pinger real fast so we don't need to wait
http_ref = nil
http_server = http_ref = HTTP::Server.new([Cable::Handler(ApplicationCable::Connection).new])
http_server = HTTP::Server.new([Cable::Handler(ApplicationCable::Connection).new])
address = http_server.bind_unused_port
address_chan.send(address)
http_server.listen
Expand Down
2 changes: 2 additions & 0 deletions spec/support/application_cable/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module ApplicationCable
if tk = token
self.identifier = tk
end

reject_unauthorized_connection if token == "reject"
end
end
end
4 changes: 1 addition & 3 deletions src/cable.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ module Cable
setting token : String = "token", example: "token"
setting url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379"
setting disable_sec_websocket_protocol_header : Bool = false
setting pool_redis_publish : Bool = false
setting redis_pool_size : Int32 = 5
setting redis_pool_timeout : Float64 = 5.0
setting redis_ping_interval : Time::Span = 15.seconds
end

def self.message(event : Symbol)
Expand Down
10 changes: 5 additions & 5 deletions src/cable/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ module Cable
property {{type_definition.var}} : {{type_definition.type}}?
end

def initialize(@request : HTTP::Request, @socket : HTTP::WebSocket)
@token = @request.query_params.fetch(Cable.settings.token, nil)
def initialize(request : HTTP::Request, @socket : HTTP::WebSocket)
@token = request.query_params.fetch(Cable.settings.token, nil)
@id = UUID.random

begin
Expand All @@ -56,13 +56,13 @@ module Cable
return true unless Connection::CHANNELS.has_key?(connection_identifier)

Connection::CHANNELS[connection_identifier].each do |identifier, channel|
channel.close
Connection::CHANNELS[connection_identifier].delete(identifier)
channel.close
rescue e : IO::Error
Cable::Logger.error { "IO::Error Exception: #{e.message} -> #{self.class.name}#close" }
end

Connection::CHANNELS.delete(connection_identifier)
Connection::CHANNELS.delete(connection_identifier) if Connection::CHANNELS.has_key?(connection_identifier)
Cable::Logger.info { "Terminating connection #{connection_identifier}" }

socket.close
Expand Down Expand Up @@ -126,7 +126,7 @@ module Cable
def reject(payload : Cable::Payload)
if channel = Connection::CHANNELS[connection_identifier].delete(payload.identifier)
channel.unsubscribed
Cable::Logger.info { "#{channel.class.to_s} is transmitting the subscription rejection" }
Cable::Logger.info { "#{channel.class} is transmitting the subscription rejection" }
socket.send({type: Cable.message(:rejection), identifier: payload.identifier}.to_json)
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/cable/debug.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ module Cable
Cable::Logger.debug { "-" * 80 }
Cable::Logger.debug { "Some Good Information" }
Cable::Logger.debug { "Connections" }
@connections.each do |k, v|
@connections.each do |k, _v|
Cable::Logger.debug { "Connection Key: #{k}" }
end
Cable::Logger.debug { "Channels" }
Expand Down
27 changes: 23 additions & 4 deletions src/cable/handler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,31 @@ module Cable
class Handler(T)
include HTTP::Handler

def on_error(&@on_error : Exception ->) : self
self
end

def call(context)
return call_next(context) unless ws_route_found?(context) && websocket_upgrade_request?(context)

remote_address = context.request.remote_address
path = context.request.path
Cable::Logger.info { "Started GET \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc.to_s}" }
Cable::Logger.info { "Started GET \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc}" }

unless Cable.settings.disable_sec_websocket_protocol_header
context.response.headers["Sec-WebSocket-Protocol"] = "actioncable-v1-json"
end

ws = HTTP::WebSocketHandler.new do |socket, context|
ws = HTTP::WebSocketHandler.new do |socket, _context|
connection = T.new(context.request, socket)
connection_id = connection.connection_identifier

# we should not add any connections which have been rejected
Cable.server.add_connection(connection) unless connection.connection_rejected?
if connection.connection_rejected?
Cable::Logger.info { "Connection rejected" }
else
Cable.server.add_connection(connection)
end

# Send welcome message to the client
socket.send({type: Cable.message(:welcome)}.to_json)
Expand All @@ -36,17 +44,28 @@ module Cable
socket.on_message do |message|
begin
connection.receive(message)
rescue e : KeyError
# handle unknown/malformed messages
socket.close(HTTP::WebSocket::CloseCode::InvalidFramePayloadData, "Invalid message")
Cable::Logger.error { "KeyError Exception: #{e.message}" }
rescue e : Cable::Connection::UnathorizedConnectionException
# do nothing, this is planned
socket.close(HTTP::WebSocket::CloseCode::NormalClosure, "Farewell")
rescue e : IO::Error
Cable::Logger.error { "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }" }
# Redis may have some error, restart Cable
socket.close(HTTP::WebSocket::CloseCode::NormalClosure, "Farewell")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #54 @mjeffrey18 added the socket.close, but this rescue branch doesn't exist on master.... Should we be closing the socket here too? If so, then which CloseCode should be used?

Cable.restart
rescue e : Exception
socket.close(HTTP::WebSocket::CloseCode::InternalServerError, "Internal Server Error")
Cable::Logger.error { "Exception: #{e.message}" }
end
end

socket.on_close do
ws_pinger.stop
Cable.server.remove_connection(connection_id)
Cable::Logger.info { "Finished \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc.to_s}" }
Cable::Logger.info { "Finished \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc}" }
end
end

Expand Down
18 changes: 0 additions & 18 deletions src/cable/monkeypatch/redis.cr

This file was deleted.

4 changes: 2 additions & 2 deletions src/cable/payload.cr
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ module Cable
end
end

return params_result
params_result
end

private def process_hash(hash : JSON::Any)
return process_hash(hash.as_h)
process_hash(hash.as_h)
end
end
end
34 changes: 34 additions & 0 deletions src/cable/redis_pinger.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module Cable
class RedisPinger
private getter task : Tasker::Task

def initialize(@server : Cable::Server)
@task = Tasker.every(Cable.settings.redis_ping_interval) do
check_redis_subscribe
check_redis_publish
rescue e
# Restart cable if something happened
Cable.restart
end
end

def stop
@task.cancel
end

# since @server.redis_subscribe connection is called on a block loop
# we basically cannot call ping outside of the block
# instead, we just spin up another new redis connection
# then publish a special channel/message broadcast
# the @server.redis_subscribe picks up this special combination
# and calls ping on the block loop for us
def check_redis_subscribe
Cable.server.publish("_internal", "ping")
end

def check_redis_publish
result = @server.redis_publish.run({"ping"})
Cable::Logger.debug { "Cable::RedisPinger.check_redis_publish -> #{result}" }
end
end
end
Loading