-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Idle sub/sub connections #7
Comments
It’s not necessary to use one or the other here (you should be able to use a Running a command against a Since subscribing monopolizes a connection and blocks a single fiber basically for the lifetime of the application, it didn’t need a
It looks like the
I haven’t seen it with this shard specifically, but I don’t doubt it happens. At my previous company, we had Redis connection errors all the time using the Ruby Periodic pings might help, but I actually don’t know if the Redis Since most Redis commands are request/response, the retry mechanism built into |
Hi, @jgaskins thanks for the reply! Really appreciate it! It's been a nightmare bug... I ended up trying to hack together a private def subscribe
spawn(name: "Cable::Server - subscribe") do
redis_subscribe.subscribe("_internal") do |subscription|
subscription.on_message do |channel, message|
if channel == "_internal" && message == "ping"
Cable::Logger.debug { "Cable::Server#subscribe -> PONG" }
else
fiber_channel.send({channel, message})
Cable::Logger.debug { "Cable::Server#subscribe channel:#{channel} message:#{message}" }
end
end
end
end
end require "schedule"
module Cable
class RedisPinger
@@started : Bool = false
@@seconds : Int32 | Float64 = Cable.settings.redis_ping
def self.run_every(value : Int32 | Float64, &block)
@@seconds = value
yield
@@seconds = Cable.settings.redis_ping
end
def self.start(server : Cable::Server)
new(server).start unless @@started
@@started = true
end
def self.seconds
@@seconds
end
def initialize(@server : Cable::Server)
end
def start
runner = Schedule::Runner.new
runner.every(Cable::RedisPinger.seconds.seconds) do
check_redis_subscribe
check_redis_publish
rescue e
@@started = false
ExceptionService.notify(e)
raise Schedule::StopException.new("Stopped")
end
end
def check_redis_subscribe
Cable.server.publish("_internal", "ping")
end
def check_redis_publish
result = @server.redis_publish.run({"ping"})
Cable::Logger.debug { "Cable::RedisPinger.check_redis_publish -> #{result}" }
end
end
end Here is the scenario
Debug efforts
I've tried everything I could think of including
The worst part is, we've been using ActionCable ruby for 2+ years and AnyCable Go for 6+ months without this issue. Unless I can find some other solution I'm thinking about using RabbitMQ instead of Redis, but means a lot more changes again so I guess it's the last resort. Redis Labs response
debug jsondef debug_json
_channels = {} of String => Set(String)
@channels.each do |k, v|
_channels[v.first.class.to_s] ||= Set{k}
_channels[v.first.class.to_s] << k
end
{
"connections" => @connections.size,
"channels" => @channels.size,
"channels_mounted" => _channels,
"connections_mounted" => @connections.map do |key, connection|
connections_mounted_channels = [] of Hash(String, String | Nil)
@channels.each do |_, v|
v.each do |channel|
next unless channel.connection.connection_identifier == key
connections_mounted_channels << {
"channel" => channel.class.to_s,
"key" => channel.stream_identifier,
}
end
end
{
"key" => key,
"identifier" => connection.identifier,
"started_at" => connection.started_at.to_s("%Y-%m-%dT%H:%M:%S.%6N"),
"channels" => connections_mounted_channels,
}
end,
}
end |
@mjeffrey18 This is all super helpful! I couldn't reproduce it myself (I spun up a Redis server on DigitalOcean yesterday and subscribed to a subject on it from my house, sending a message once every 1-2 hours from a separate process, and it's still receiving messages this morning), so there might be something in the specific infrastructure you're running on that is different from what I'm seeing. We can twiddle TCP keepalive bits in Crystal, so we might as well give that a shot. That should help avoid race conditions and other weird situations with my off-the-cuff suggestion above. 😄 I don't know what reasonable settings are for it, but maybe some tuning can help. The hard part is how long the feedback loop is. |
Hi @jgaskins, thanks you so much for trying to debug this issue. Yeah, it seems to only happen intermittently. Which makes this way more annoying haha I've also never encountered the issue locally, only in staging/production. I've seen it be fine all day and sometimes within a short period of time. I posted this issue in the luckyframework Discord chat and looks like a good few others experienced the same issue at some point but no one was sure why it happened. I think your onto something with your suggestion. If you can give me any pointers as to how I might implement/investigate those setting in Crystal in relation to this Shard, I can start poking around to see if I can find a solution while replicating the issue. Thanks again for your support! |
@mjeffrey18 Good find on that Redis issue! Good to know it's not just this shard or even just Crystal experiencing this problem, but at least we seem to be able to respond to it. I imagine the best place to start with TCP keepalive probes would be where we create the socket. Right now, the only option we're setting is If I understand it correctly, setting The hard part is likely figuring out what to do if a keepalive probe closes the socket. This will likely raise an |
Thanks @jgaskins, really appreciate it! I ended up going with this implementation for the first attempt. def initialize(@uri = URI.parse("redis:///"))
host = uri.host.presence || "localhost"
port = uri.port || 6379
socket = TCPSocket.new(host, port)
socket.sync = false
socket.tcp_keepalive_count = 3
socket.tcp_keepalive_idle = 60
socket.tcp_keepalive_interval = 30 I'll keep you posted if we managed to survive a few days without any issues... Regarding the recovery, if we get this first part working I'll look at a way to simulate the disconnect then see how to gracefully recover. Thanks again for your help so far! |
Hey @jgaskins it didn't work out, it stalled a few hours after deploying. However, I may have found the issue :-) https://redis.io/topics/clients#output-buffers-limits Turns out one of our WS channels can on occasion send a fair amount of data (by mistake). It only happens in rare cases if the record has a ton of associations. I believe the buffer is being reached, the socket gets killed by the Redis server and the client just silently did nothing... Until I added the above tcp_keepalive stuff, which caused the client to crash and give me some hint to what may be going on! Anyways, will check again for the next few days and report back. 🙏 |
@mjeffrey18 Nice discovery! I didn't realize pub/sub set different I/O limits than plain data-structure queries, but it makes sense. |
I wonder if this is the same thing I'm seeing in my app related to cable-cr/cable#44 🤔 Since it looks like this shard is just using db/pool, maybe moving Cable over to this would fix the issue? I'll try to give that a shot and test it out on my app. If anyone has any other ideas/suggestions on what we can do to improve Cable, let me know! |
My fork of |
Hey @jwoertink / @jgaskins, apologies for the delayed response.
module Redis
class Connection
def initialize(@uri = URI.parse("redis:///"))
host = uri.host.presence || "localhost"
port = uri.port || 6379
socket = TCPSocket.new(host, port)
socket.sync = false
# new lines added to help with tcp_keepalive
count, idle, interval = Config.settings.tcp_keepalive_settings.split(",").map(&.chomp.strip.to_i)
socket.tcp_keepalive_count = count
socket.tcp_keepalive_idle = idle
socket.tcp_keepalive_interval = interval
# end
# Check whether we should use SSL
if uri.scheme == "rediss"
socket = OpenSSL::SSL::Socket::Client.new(socket)
socket.sync = false
end
@socket = socket
@parser = Parser.new(@socket)
pipeline do |_redis|
# Authentication
if password = uri.password
run({"auth", password})
end
# DB select
db = if {"", "/"}.includes?(uri.path)
"0"
else
uri.path[1..-1]
end
run({"select", db}) unless db == "0"
end
end
end
end
class Config
DEFAULT_REDIS_POOLING_QUERY = "?initial_pool_size=50&max_pool_size=50&max_idle_pool_size=50"
DEFAULT_POSTGRES_POOLING_QUERY = "?initial_pool_size=5&max_pool_size=5&max_idle_pool_size=5"
DEFAULT_TCP_KEEPALIVE_SETTINGS = "3,60,30"
DEFAULT_ALLOWED_ERRORS = "Lucky::UnknownAcceptHeaderError"
Habitat.create do
setting app_name : String = ENV.fetch("APP_NAME", LuckyEnv.environment)
setting redis_url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379"
setting redis_pooling_query : String = ENV.fetch("REDIS_POOLING_QUERY", DEFAULT_REDIS_POOLING_QUERY)
setting postgres_pooling_query : String = ENV.fetch("POSTGRES_POOLING_QUERY", DEFAULT_POSTGRES_POOLING_QUERY)
setting tcp_keepalive_settings : String = ENV.fetch("TCP_KEEPALIVE_SETTINGS", DEFAULT_TCP_KEEPALIVE_SETTINGS)
setting allowed_errors : Set(String) = ENV.fetch("ALLOWED_ERRORS", DEFAULT_ALLOWED_ERRORS).split(",").map(&.chomp.strip).to_set
end
end
# In the cable main class
module Cable
VERSION = "0.1.0"
INTERNAL = {
message_types: {
welcome: "welcome",
disconnect: "disconnect",
ping: "ping",
confirmation: "confirm_subscription",
rejection: "reject_subscription",
unsubscribe: "confirm_unsubscription",
},
disconnect_reasons: {
unauthorized: "unauthorized",
invalid_request: "invalid_request",
server_restart: "server_restart",
},
default_mount_path: "/cable",
protocols: ["actioncable-v1-json", "actioncable-unsupported"],
}
def self.message(event : Symbol)
INTERNAL[:message_types][event]
end
# this keeps us close to the open source config but using our own
Habitat.create do
setting route : String = "/cable", example: "/cable"
setting token : String = "token", example: "token"
setting socket_ping : Int32, example: 3
setting redis_ping : Int32, example: 3
setting url : String = Config.settings.redis_url + Config.settings.redis_pooling_query
end
end
# Configure your settings
Cable.configure do |settings|
settings.socket_ping = 3
settings.redis_ping = 15 # not this, we'll discuss this in point 4
end
require "schedule"
module Cable
class RedisPinger
@@started : Bool = false
@@seconds : Int32 | Float64 = Cable.settings.redis_ping
def self.run_every(value : Int32 | Float64, &block)
@@seconds = value
yield
@@seconds = Cable.settings.redis_ping
end
def self.start(server : Cable::Server)
new(server).start unless @@started
@@started = true
end
def self.seconds
@@seconds
end
def initialize(@server : Cable::Server)
end
def start
runner = Schedule::Runner.new
runner.every(Cable::RedisPinger.seconds.seconds) do
check_redis_subscribe
check_redis_publish
rescue e
ExceptionService.notify(e)
Cable.restart # critical for recovery
end
end
# since @server.redis_subscribe connection is called on a block loop
# we basically cannot call ping outside of the block
# instead, we just spin up another new redis connection
# then publish a special channel/message broadcast
# the @server.redis_subscribe picks up this special combination
# and calls ping on the block loop for us
def check_redis_subscribe
Cable.server.publish("_internal", "ping")
end
def check_redis_publish
result = @server.redis_publish.run({"ping"})
Cable::Logger.debug { {source: "cable", message: "Cable::RedisPinger.check_redis_publish -> #{result}"} }
end
end
end This needs to be started in the handler - I'll share the complete handler code at the end Cable::RedisPinger.start Cable.server Changes also need to be made in the Cable::Server#subscribe logic to support this require "mutex"
require "set"
module Cable
class Server
private def subscribe
spawn(name: "Cable::Server - subscribe") do
redis_subscribe.subscribe("_internal") do |subscription|
subscription.on_message do |channel, message|
if channel == "_internal" && message == "ping"
Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe -> PONG"} }
elsif channel == "_internal" && message == "debug"
Cable.server.debug
else
fiber_channel.send({channel, message})
Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe channel:#{channel} message:#{message}"} }
end
end
end
end
end
end
end
# Handle incoming message and echo back to the client
socket.on_message do |message|
begin
connection.receive(message)
rescue e : Cable::Connection::UnathorizedConnectionException
# do nothing, we're all good
rescue e : IO::Error
Cable::Logger.error { {logger: "Cable", message: "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }"} }
ExceptionService.notify(e)
Cable.restart # This is the critical part
rescue e : Exception
Cable::Logger.error { {logger: "Cable", message: "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }"} }
ExceptionService.notify(e)
end
end
Since all these changes, everything has been amazing! There are a bunch of other small improvements we made across the board but those will not help with this issue so no point in mentioning them. Sharing out current server / handler classes; require "http/server"
module Cable
class Handler(T)
include HTTP::Handler
def on_error(&@on_error : Exception ->) : self
self
end
def call(context)
return call_next(context) unless ws_route_found?(context) && websocket_upgrade_request?(context)
remote_address = context.request.remote_address
path = context.request.path
Cable::Logger.debug { {logger: "Cable", message: "Started GET \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc}"} }
unless ENV["DISABLE_SEC_WEBSOCKET_PROTOCOL_HEADER"]?
context.response.headers["Sec-WebSocket-Protocol"] = "actioncable-v1-json"
end
ws = HTTP::WebSocketHandler.new do |socket, _context|
connection = T.new(context.request, socket)
connection_id = connection.connection_identifier
# we should not add any connections which have been rejected
if connection.connection_rejected?
Cable.instrument(:reject_connection)
else
Cable.server.add_connection(connection)
end
# Send welcome message to the client
socket.send({type: Cable.message(:welcome)}.to_json)
Cable::RedisPinger.start Cable.server
Cable::WebsocketPinger.start socket
socket.on_ping do
socket.pong context.request.path
Cable::Logger.debug { {logger: "Cable", message: "Ping received"} }
end
# Handle incoming message and echo back to the client
socket.on_message do |message|
begin
connection.receive(message)
rescue e : Cable::Connection::UnathorizedConnectionException
# do nothing, we're all good
rescue e : IO::Error
Cable::Logger.error { {logger: "Cable", message: "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }"} }
ExceptionService.notify(e)
Cable.restart
rescue e : Exception
Cable::Logger.error { {logger: "Cable", message: "#{e.class.name} Exception: #{e.message} -> #{self.class.name}#call { socket.on_message(message) }"} }
ExceptionService.notify(e)
end
end
socket.on_close do
Cable.server.remove_connection(connection_id)
Cable::Logger.debug { {logger: "Cable", message: "Finished \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc}"} }
end
end
Cable::Logger.debug { {logger: "Cable", message: "Successfully upgraded to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: Upgrade, HTTP_UPGRADE: websocket)"} }
ws.call(context)
rescue e
ExceptionService.notify(e)
raise e
end
private def websocket_upgrade_request?(context)
return unless upgrade = context.request.headers["Upgrade"]?
return unless upgrade.compare("websocket", case_insensitive: true) == 0
context.request.headers.includes_word?("Connection", "Upgrade")
end
private def ws_route_found?(context)
return true if context.request.path === Cable.settings.route
false
end
end
end
require "mutex"
require "set"
module Cable
alias Channels = Set(Cable::Channel)
TRACE_KEYS = {
connection: "http.websocket.connection_count",
broadcast: "http.websocket.broadcast_count",
tags: Array(String).new,
connection_success_tags: %w[status:success],
connection_rejected_tags: %w[status:rejected],
}
def self.server
@@server ||= Server.new
end
def self.restart
if current_server = @@server
current_server.shutdown
end
@@server = Server.new
end
def self.instrument(action : Symbol, tags = TRACE_KEYS[:tags])
case action
when :add_connection
Datadog.metrics.increment TRACE_KEYS[:connection], tags: TRACE_KEYS[:connection_success_tags]
when :reject_connection
Datadog.metrics.increment TRACE_KEYS[:connection], tags: TRACE_KEYS[:connection_rejected_tags]
when :broadcast
Datadog.metrics.increment TRACE_KEYS[:broadcast], tags: tags
end
rescue e
Cable::Logger.error { {logger: "Cable", message: "Instrument Exception: #{e.message}"} }
ExceptionService.notify(e)
end
class Server
include Debug
getter connections = {} of String => Connection
getter redis_subscribe = Redis::Connection.new(URI.parse(Cable.settings.url))
getter redis_publish = Redis::Client.new(URI.parse(Cable.settings.url))
getter fiber_channel = ::Channel({String, String}).new
@channels = {} of String => Channels
@channel_mutex = Mutex.new
def initialize
subscribe
process_subscribed_messages
rescue e
ExceptionService.notify(e)
raise e
end
def add_connection(connection)
connections[connection.connection_identifier] = connection
Cable.instrument(:add_connection)
end
def remove_connection(connection_id)
connections.delete(connection_id).try(&.close)
end
def subscribe_channel(channel : Channel, stream_identifier : String)
@channel_mutex.synchronize do
if !@channels.has_key?(stream_identifier)
@channels[stream_identifier] = Channels.new
end
@channels[stream_identifier] << channel
end
redis_subscribe.encode({"subscribe", stream_identifier})
redis_subscribe.flush
end
def unsubscribe_channel(channel : Channel, stream_identifier : String)
@channel_mutex.synchronize do
if @channels.has_key?(stream_identifier)
@channels[stream_identifier].delete(channel)
if @channels[stream_identifier].size == 0
redis_subscribe.unsubscribe stream_identifier
@channels.delete(stream_identifier)
end
else
redis_subscribe.unsubscribe stream_identifier
end
end
end
# redis only accepts strings, so we should be strict here
def publish(stream_identifier : String, message : String)
redis_publish.publish(stream_identifier, message)
end
def send_to_channels(stream_identifier, message)
return unless @channels.has_key?(stream_identifier)
parsed_message = safe_decode_message(message)
@channels[stream_identifier].each do |channel|
if channel.connection.socket.closed?
channel.close
else
Cable::Logger.debug { {logger: "Cable", message: "#{channel.class} transmitting #{parsed_message} (via streamed from #{channel.stream_identifier})"} }
channel.connection.socket.send({
identifier: channel.identifier,
message: parsed_message,
}.to_json)
end
Cable.instrument(:broadcast, tags: ["channel:#{channel.class}"])
rescue e : IO::Error
Cable::Logger.error { {logger: "Cable", message: "IO::Error Cable::Server#send_to_channels Exception: #{e.message} -> #{self.class.name}#send_to_channels(channel, message)"} }
ExceptionService.notify(e)
end
end
def safe_decode_message(message)
case message
when String
JSON.parse(message)
else
message
end
rescue JSON::ParseException
message
end
def shutdown
redis_subscribe.run({"unsubscribe"})
redis_subscribe.close
redis_publish.close
connections.each do |_, v|
v.close
end
end
private def process_subscribed_messages
server = self
spawn(name: "Cable::Server - process_subscribed_messages") do
while received = fiber_channel.receive
stream_identifier, message = received
server.send_to_channels(stream_identifier, message)
Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#process_subscribed_messages stream:#{stream_identifier} message:#{message}"} }
end
end
end
private def subscribe
spawn(name: "Cable::Server - subscribe") do
redis_subscribe.subscribe("_internal") do |subscription|
subscription.on_message do |channel, message|
if channel == "_internal" && message == "ping"
Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe -> PONG"} }
elsif channel == "_internal" && message == "debug"
# Cable.server.debug
Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe -> ALIVE"} }
else
fiber_channel.send({channel, message})
Cable::Logger.debug { {logger: "Cable", message: "Cable::Server#subscribe channel:#{channel} message:#{message}"} }
end
end
end
end
end
end
end Hope this helps! We had a total nightmare debugging all this so happy to share our results. |
I've started porting this over in to Cable using this example from @mjeffrey18 cable-cr/cable#48 I am running in to a strange issue when running the specs. cable-cr/cable#48 (comment) It seems that when we call |
@mjeffrey18 by chance are you running redis in Cluster mode? I've been using the new branch of Cable with this shard for a week now. We were rebooting our app daily until we switched to using this Redis shard. We went a week before needing a reboot due to a memory leak. After the memory leak was found and fixed, I made 1 last change based on the above code to move the RedisPinger cable-cr/cable@6b11799 I deployed that yesterday, and in less than 24 hours later, our pub/sub just stopped working. I'm using redis hosted on AWS Elasticache in Cluster mode. I'm wondering if maybe a cluster node is randomly dying, and swapping to a new one, but since Cable doesn't use the Cluster setup, it's just not understanding how to fix the connection. We know this happens on Cable master branch with the old redis too, but that one doesn't support Cluster at all. Our app is just using a normal chat room setup. Send a message, that goes to the server, then redis receives the message and broadcasts back out to all clients. When it breaks, the chat goes to the server, but the broadcast back out never happens. I haven't tried yet, but I think if I can setup a cluster redis in docker locally, then kill off a node, maybe I can recreate this issue. |
Hey @jwoertink, I'm not running cluster mode. We use redis.com with half a dozen Redis DB + failovers for different purposes in production. In staging, we use Elasticache but not in cluster mode. Also, might be worth checking out one of my previous comments.
If you do not receive any IO errors and, no matter what you do, you just can't seem to broadcast i.e. pub/sub is silently down, it may be that you exceed the dreaded I'll ping you guys once I've had a chance to review everything. |
Hey @jgaskins, thanks for the amazing work on this shard!
I was looking at your fork of cable.cr and noticed you using this connection setup;
I'm having a huge issue with long-running sockets and my debugging efforts have led me to believe Redis is losing connection with this block
I was curious why you used
Redis::Connection
for the sub/sub part?I've manually run Redis publish commands using the CLI etc and nothing is coming through and I can see the connection on the client list but for some reason, nothing is coming into this block.
This issue only happens after an hour or so...
I started implementing a ping/pong server every 3 seconds to attempt to keep the connection open but looks like this is not possible with
Redis::Connection
unless I'm mistaken?Would love to hear if you have experienced something similar and also if you think having a ping/pong operation on
Redis::Connection
may help solve this issue?The text was updated successfully, but these errors were encountered: