Skip to content

Commit

Permalink
Fix memory leak (#51)
Browse files Browse the repository at this point in the history
* Fix memory leak

The `Tasker::Task` that pings each websocket was not being cleaned up
and garbage-collected, which meant that the WebSocket itself was also
not being GCed since it (and the WebSocketPinger itself) was referenced
inside the task block, leading to a leak for every single WebSocket.

I had a feeling this was the case because CPU and RAM consumption were
not static after a lot of WebSockets had been connected, even if they
were *no longer* connected. Turns out every single WebSocket was still
being pinged or raising an exception if it were closed. That exception
was not causing the `Tasker::Task` to stop.

* Update src/cable/websocket_pinger.cr

Co-authored-by: Jeremy Woertink <[email protected]>
  • Loading branch information
jgaskins and jwoertink authored Oct 5, 2022
1 parent a6672c4 commit 9e075b0
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/cable/handler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module Cable
# Send welcome message to the client
socket.send({type: Cable.message(:welcome)}.to_json)

Cable::WebsocketPinger.build(socket)
ws_pinger = Cable::WebsocketPinger.build(socket)

socket.on_ping do
socket.pong context.request.path
Expand All @@ -44,6 +44,7 @@ module Cable
end

socket.on_close do
ws_pinger.stop
Cable.server.remove_connection(connection_id)
Cable::Logger.info { "Finished \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc.to_s}" }
end
Expand Down
7 changes: 6 additions & 1 deletion src/cable/websocket_pinger.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Cable
class PingStoppedException < Exception; end

@@seconds : Int32 | Float64 = 3
@task : Tasker::Task

def self.run_every(value : Int32 | Float64, &block)
@@seconds = value
Expand All @@ -23,10 +24,14 @@ module Cable
end

def initialize(@socket : HTTP::WebSocket)
Tasker.every(Cable::WebsocketPinger.seconds.seconds) do
@task = Tasker.every(Cable::WebsocketPinger.seconds.seconds) do
raise PingStoppedException.new("Stopped") if @socket.closed?
@socket.send({type: Cable.message(:ping), message: Time.utc.to_unix}.to_json)
end
end

def stop
@task.cancel
end
end
end

0 comments on commit 9e075b0

Please sign in to comment.