From 9e075b0d1882c9d74590243c362a951cf06c72dd Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 5 Oct 2022 11:22:26 -0400 Subject: [PATCH] Fix memory leak (#51) * Fix memory leak The `Tasker::Task` that pings each websocket was not being cleaned up and garbage-collected, which meant that the WebSocket itself was also not being GCed since it (and the WebSocketPinger itself) was referenced inside the task block, leading to a leak for every single WebSocket. I had a feeling this was the case because CPU and RAM consumption were not static after a lot of WebSockets had been connected, even if they were *no longer* connected. Turns out every single WebSocket was still being pinged or raising an exception if it were closed. That exception was not causing the `Tasker::Task` to stop. * Update src/cable/websocket_pinger.cr Co-authored-by: Jeremy Woertink --- src/cable/handler.cr | 3 ++- src/cable/websocket_pinger.cr | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/cable/handler.cr b/src/cable/handler.cr index 5194dd7..e3b26d7 100644 --- a/src/cable/handler.cr +++ b/src/cable/handler.cr @@ -25,7 +25,7 @@ module Cable # Send welcome message to the client socket.send({type: Cable.message(:welcome)}.to_json) - Cable::WebsocketPinger.build(socket) + ws_pinger = Cable::WebsocketPinger.build(socket) socket.on_ping do socket.pong context.request.path @@ -44,6 +44,7 @@ module Cable end socket.on_close do + ws_pinger.stop Cable.server.remove_connection(connection_id) Cable::Logger.info { "Finished \"#{path}\" [WebSocket] for #{remote_address} at #{Time.utc.to_s}" } end diff --git a/src/cable/websocket_pinger.cr b/src/cable/websocket_pinger.cr index c607e03..168d856 100644 --- a/src/cable/websocket_pinger.cr +++ b/src/cable/websocket_pinger.cr @@ -5,6 +5,7 @@ module Cable class PingStoppedException < Exception; end @@seconds : Int32 | Float64 = 3 + @task : Tasker::Task def self.run_every(value : Int32 | Float64, &block) @@seconds = value @@ -23,10 +24,14 @@ module Cable end def initialize(@socket : HTTP::WebSocket) - Tasker.every(Cable::WebsocketPinger.seconds.seconds) do + @task = Tasker.every(Cable::WebsocketPinger.seconds.seconds) do raise PingStoppedException.new("Stopped") if @socket.closed? @socket.send({type: Cable.message(:ping), message: Time.utc.to_unix}.to_json) end end + + def stop + @task.cancel + end end end