Skip to content

Commit

Permalink
all connect specs working except replace client with new connection
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Sep 23, 2024
1 parent b83ea2b commit 7cf822c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 18 deletions.
15 changes: 8 additions & 7 deletions spec/mqtt/integrations/connect_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module MqttSpecs
extend MqttMatchers
describe "connect [MQTT-3.1.4-1]" do
describe "when client already connected" do
pending "should replace the already connected client [MQTT-3.1.4-2]" do
it "should replace the already connected client [MQTT-3.1.4-2]" do
with_server do |server|
with_client_io(server) do |io|
connect(io, false)
Expand Down Expand Up @@ -163,7 +163,8 @@ module MqttSpecs
end
end

pending "for password flag set without username flag set [MQTT-3.1.2-22]" do
# TODO: rescue and log error
it "for password flag set without username flag set [MQTT-3.1.2-22]" do
with_server do |server|
with_client_io(server) do |io|
connect = MQTT::Protocol::Connect.new(
Expand All @@ -186,7 +187,7 @@ module MqttSpecs
end

describe "tcp socket is closed [MQTT-3.1.4-1]" do
pending "if first packet is not a CONNECT [MQTT-3.1.0-1]" do
it "if first packet is not a CONNECT [MQTT-3.1.0-1]" do
with_server do |server|
with_client_io(server) do |io|
ping(io)
Expand All @@ -195,7 +196,7 @@ module MqttSpecs
end
end

pending "for a second CONNECT packet [MQTT-3.1.0-2]" do
it "for a second CONNECT packet [MQTT-3.1.0-2]" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
Expand All @@ -206,7 +207,7 @@ module MqttSpecs
end
end

pending "for invalid client id [MQTT-3.1.3-4]." do
it "for invalid client id [MQTT-3.1.3-4]." do
with_server do |server|
with_client_io(server) do |io|
MQTT::Protocol::Connect.new(
Expand All @@ -223,7 +224,7 @@ module MqttSpecs
end
end

pending "for invalid protocol name [MQTT-3.1.2-1]" do
it "for invalid protocol name [MQTT-3.1.2-1]" do
with_server do |server|
with_client_io(server) do |io|
connect = MQTT::Protocol::Connect.new(
Expand All @@ -244,7 +245,7 @@ module MqttSpecs
end
end

pending "for reserved bit set [MQTT-3.1.2-3]" do
it "for reserved bit set [MQTT-3.1.2-3]" do
with_server do |server|
with_client_io(server) do |io|
connect = MQTT::Protocol::Connect.new(
Expand Down
22 changes: 14 additions & 8 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,34 @@ module LavinMQ
module MQTT
class Broker

getter vhost
getter vhost, sessions
def initialize(@vhost : VHost)
@queues = Hash(String, Session).new
@sessions = Hash(String, Session).new
@clients = Hash(String, Client).new
end

def clean_session?(client_id : String) : Bool
session = @sessions[client_id]?
return false if session.nil?
session.set_clean_session
def connect_client(socket, connection_info, user, vhost, packet)
if prev_client = @clients[packet.client_id]?
Log.trace { "Found previous client connected with client_id: #{packet.client_id}, closing" }
pp "rev client"
prev_client.close
end
client = MQTT::Client.new(socket, connection_info, user, vhost, self, packet.client_id, packet.clean_session?, packet.will)
@clients[packet.client_id] = client
client
end

def session_present?(client_id : String, clean_session) : Bool
session = @sessions[client_id]?
clean_session?(client_id)
return false if session.nil? || clean_session
pp "session_present? #{session.inspect}"
return false if session.nil? || clean_session && session.set_clean_session
true
end

def start_session(client_id, clean_session)
session = MQTT::Session.new(@vhost, client_id)
session.clean_session if clean_session
session.set_clean_session if clean_session
@sessions[client_id] = session
@queues[client_id] = session
end
Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ module LavinMQ
rescue ex : ::IO::EOFError
Log.info { "eof #{ex.inspect}" }
ensure
pp "ensuring"
publish_will if @will
@broker.clear_session(client_id) if @clean_session
@socket.close
Expand Down Expand Up @@ -169,6 +170,9 @@ module LavinMQ
end

def close(reason = "")
Log.trace { "Client#close" }
@closed = true
@socket.close
end

def force_close
Expand Down
4 changes: 1 addition & 3 deletions src/lavinmq/mqtt/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ module LavinMQ
end

def start(socket : ::IO, connection_info : ConnectionInfo)

io = MQTT::IO.new(socket)
if packet = MQTT::Packet.from_io(socket).as?(MQTT::Connect)
Log.trace { "recv #{packet.inspect}" }
if user = authenticate(io, packet)
session_present = @broker.session_present?(packet.client_id, packet.clean_session?)
pp "in connection_factory: #{session_present}"
MQTT::Connack.new(session_present, MQTT::Connack::ReturnCode::Accepted).to_io(io)
io.flush
return LavinMQ::MQTT::Client.new(socket, connection_info, user, @vhost, @broker, packet.client_id, packet.clean_session?, packet.will)
return @broker.connect_client(socket, connection_info, user, @vhost, packet)
end
end
rescue ex : MQTT::Error::Connect
Expand Down
8 changes: 8 additions & 0 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@ module LavinMQ
end

def set_clean_session
pp "Setting clean session"
clear_session
@clean_session = true
end


#Maybe use something other than purge?
def clear_session
purge
end

#TODO: implement subscribers array and session_present? and send instead of false
def connect(client)
client.send(MQTT::Connack.new(false, MQTT::Connack::ReturnCode::Accepted))
Expand Down

0 comments on commit 7cf822c

Please sign in to comment.