diff --git a/spec/mqtt/integrations/connect_spec.cr b/spec/mqtt/integrations/connect_spec.cr index 8522b8ab6..d2bf9a9f1 100644 --- a/spec/mqtt/integrations/connect_spec.cr +++ b/spec/mqtt/integrations/connect_spec.cr @@ -5,7 +5,7 @@ module MqttSpecs extend MqttMatchers describe "connect [MQTT-3.1.4-1]" do describe "when client already connected" do - pending "should replace the already connected client [MQTT-3.1.4-2]" do + it "should replace the already connected client [MQTT-3.1.4-2]" do with_server do |server| with_client_io(server) do |io| connect(io, false) @@ -163,7 +163,8 @@ module MqttSpecs end end - pending "for password flag set without username flag set [MQTT-3.1.2-22]" do + # TODO: rescue and log error + it "for password flag set without username flag set [MQTT-3.1.2-22]" do with_server do |server| with_client_io(server) do |io| connect = MQTT::Protocol::Connect.new( @@ -186,7 +187,7 @@ module MqttSpecs end describe "tcp socket is closed [MQTT-3.1.4-1]" do - pending "if first packet is not a CONNECT [MQTT-3.1.0-1]" do + it "if first packet is not a CONNECT [MQTT-3.1.0-1]" do with_server do |server| with_client_io(server) do |io| ping(io) @@ -195,7 +196,7 @@ module MqttSpecs end end - pending "for a second CONNECT packet [MQTT-3.1.0-2]" do + it "for a second CONNECT packet [MQTT-3.1.0-2]" do with_server do |server| with_client_io(server) do |io| connect(io) @@ -206,7 +207,7 @@ module MqttSpecs end end - pending "for invalid client id [MQTT-3.1.3-4]." do + it "for invalid client id [MQTT-3.1.3-4]." do with_server do |server| with_client_io(server) do |io| MQTT::Protocol::Connect.new( @@ -223,7 +224,7 @@ module MqttSpecs end end - pending "for invalid protocol name [MQTT-3.1.2-1]" do + it "for invalid protocol name [MQTT-3.1.2-1]" do with_server do |server| with_client_io(server) do |io| connect = MQTT::Protocol::Connect.new( @@ -244,7 +245,7 @@ module MqttSpecs end end - pending "for reserved bit set [MQTT-3.1.2-3]" do + it "for reserved bit set [MQTT-3.1.2-3]" do with_server do |server| with_client_io(server) do |io| connect = MQTT::Protocol::Connect.new( diff --git a/src/lavinmq/mqtt/broker.cr b/src/lavinmq/mqtt/broker.cr index 1788c1546..1dd334590 100644 --- a/src/lavinmq/mqtt/broker.cr +++ b/src/lavinmq/mqtt/broker.cr @@ -2,28 +2,34 @@ module LavinMQ module MQTT class Broker - getter vhost + getter vhost, sessions def initialize(@vhost : VHost) @queues = Hash(String, Session).new @sessions = Hash(String, Session).new + @clients = Hash(String, Client).new end - def clean_session?(client_id : String) : Bool - session = @sessions[client_id]? - return false if session.nil? - session.set_clean_session + def connect_client(socket, connection_info, user, vhost, packet) + if prev_client = @clients[packet.client_id]? + Log.trace { "Found previous client connected with client_id: #{packet.client_id}, closing" } + pp "rev client" + prev_client.close + end + client = MQTT::Client.new(socket, connection_info, user, vhost, self, packet.client_id, packet.clean_session?, packet.will) + @clients[packet.client_id] = client + client end def session_present?(client_id : String, clean_session) : Bool session = @sessions[client_id]? - clean_session?(client_id) - return false if session.nil? || clean_session + pp "session_present? #{session.inspect}" + return false if session.nil? || clean_session && session.set_clean_session true end def start_session(client_id, clean_session) session = MQTT::Session.new(@vhost, client_id) - session.clean_session if clean_session + session.set_clean_session if clean_session @sessions[client_id] = session @queues[client_id] = session end diff --git a/src/lavinmq/mqtt/client.cr b/src/lavinmq/mqtt/client.cr index 3d56a6b5b..5258a4f69 100644 --- a/src/lavinmq/mqtt/client.cr +++ b/src/lavinmq/mqtt/client.cr @@ -56,6 +56,7 @@ module LavinMQ rescue ex : ::IO::EOFError Log.info { "eof #{ex.inspect}" } ensure + pp "ensuring" publish_will if @will @broker.clear_session(client_id) if @clean_session @socket.close @@ -169,6 +170,9 @@ module LavinMQ end def close(reason = "") + Log.trace { "Client#close" } + @closed = true + @socket.close end def force_close diff --git a/src/lavinmq/mqtt/connection_factory.cr b/src/lavinmq/mqtt/connection_factory.cr index b8dda7111..1d8574a09 100644 --- a/src/lavinmq/mqtt/connection_factory.cr +++ b/src/lavinmq/mqtt/connection_factory.cr @@ -15,16 +15,14 @@ module LavinMQ end def start(socket : ::IO, connection_info : ConnectionInfo) - io = MQTT::IO.new(socket) if packet = MQTT::Packet.from_io(socket).as?(MQTT::Connect) Log.trace { "recv #{packet.inspect}" } if user = authenticate(io, packet) session_present = @broker.session_present?(packet.client_id, packet.clean_session?) - pp "in connection_factory: #{session_present}" MQTT::Connack.new(session_present, MQTT::Connack::ReturnCode::Accepted).to_io(io) io.flush - return LavinMQ::MQTT::Client.new(socket, connection_info, user, @vhost, @broker, packet.client_id, packet.clean_session?, packet.will) + return @broker.connect_client(socket, connection_info, user, @vhost, packet) end end rescue ex : MQTT::Error::Connect diff --git a/src/lavinmq/mqtt/session.cr b/src/lavinmq/mqtt/session.cr index 673e12375..e07fb8f14 100644 --- a/src/lavinmq/mqtt/session.cr +++ b/src/lavinmq/mqtt/session.cr @@ -12,9 +12,17 @@ module LavinMQ end def set_clean_session + pp "Setting clean session" + clear_session @clean_session = true end + + #Maybe use something other than purge? + def clear_session + purge + end + #TODO: implement subscribers array and session_present? and send instead of false def connect(client) client.send(MQTT::Connack.new(false, MQTT::Connack::ReturnCode::Accepted))