diff --git a/spec/mqtt/integrations/connect_spec.cr b/spec/mqtt/integrations/connect_spec.cr index d2bf9a9f1..f81f0cb6e 100644 --- a/spec/mqtt/integrations/connect_spec.cr +++ b/spec/mqtt/integrations/connect_spec.cr @@ -8,7 +8,7 @@ module MqttSpecs it "should replace the already connected client [MQTT-3.1.4-2]" do with_server do |server| with_client_io(server) do |io| - connect(io, false) + connect(io) with_client_io(server) do |io2| connect(io2) io.should be_closed diff --git a/src/lavinmq/mqtt/broker.cr b/src/lavinmq/mqtt/broker.cr index 1dd334590..37b38da2d 100644 --- a/src/lavinmq/mqtt/broker.cr +++ b/src/lavinmq/mqtt/broker.cr @@ -23,7 +23,7 @@ module LavinMQ def session_present?(client_id : String, clean_session) : Bool session = @sessions[client_id]? pp "session_present? #{session.inspect}" - return false if session.nil? || clean_session && session.set_clean_session + return false if session.nil? || ( clean_session && session.set_clean_session ) true end diff --git a/src/lavinmq/mqtt/client.cr b/src/lavinmq/mqtt/client.cr index 5258a4f69..8278cb72d 100644 --- a/src/lavinmq/mqtt/client.cr +++ b/src/lavinmq/mqtt/client.cr @@ -43,7 +43,7 @@ module LavinMQ private def read_loop loop do - Log.trace { "waiting for packet" } + @log.trace { "waiting for packet" } packet = read_and_handle_packet # The disconnect packet has been handled and the socket has been closed. # If we dont breakt the loop here we'll get a IO/Error on next read. @@ -52,9 +52,9 @@ module LavinMQ rescue ex : ::MQTT::Protocol::Error::PacketDecode @socket.close rescue ex : MQTT::Error::Connect - Log.warn { "Connect error #{ex.inspect}" } - rescue ex : ::IO::EOFError - Log.info { "eof #{ex.inspect}" } + @log.warn { "Connect error #{ex.inspect}" } + rescue ex : ::IO::Error + @log.warn(exception: ex) { "Read Loop error" } ensure pp "ensuring" publish_will if @will @@ -65,7 +65,7 @@ module LavinMQ def read_and_handle_packet packet : MQTT::Packet = MQTT::Packet.from_io(@io) - Log.info { "recv #{packet.inspect}" } + @log.info { "recv #{packet.inspect}" } @recv_oct_count += packet.bytesize case packet @@ -147,14 +147,14 @@ module LavinMQ def start_session(client) : MQTT::Session if @clean_session - Log.trace { "clear session" } + @log.trace { "clear session" } @broker.clear_session(client) end @broker.start_session(client) end def disconnect_session(client) - Log.trace { "disconnect session" } + @log.trace { "disconnect session" } @broker.clear_session(client) end @@ -163,14 +163,14 @@ module LavinMQ if will = @will end rescue ex - Log.warn { "Failed to publish will: #{ex.message}" } + @log.warn { "Failed to publish will: #{ex.message}" } end def update_rates end def close(reason = "") - Log.trace { "Client#close" } + @log.trace { "Client#close" } @closed = true @socket.close end