Skip to content

Commit

Permalink
pass more specs and handle session_present
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Sep 23, 2024
1 parent 7130ee3 commit b83ea2b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 12 deletions.
2 changes: 1 addition & 1 deletion spec/mqtt/integrations/connect_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module MqttSpecs
with_client_io(server) do |io|
connect(io, clean_session: false)

# Myra won't save sessions without subscriptions
# LavinMQ won't save sessions without subscriptions
subscribe(io,
topic_filters: [subtopic("a/topic", 0u8)],
packet_id: 1u16
Expand Down
25 changes: 19 additions & 6 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@ module LavinMQ
@sessions = Hash(String, Session).new
end

def start_session(client : Client)
client_id = client.client_id
session = MQTT::Session.new(self, client_id)
def clean_session?(client_id : String) : Bool
session = @sessions[client_id]?
return false if session.nil?
session.set_clean_session
end

def session_present?(client_id : String, clean_session) : Bool
session = @sessions[client_id]?
clean_session?(client_id)
return false if session.nil? || clean_session
true
end

def start_session(client_id, clean_session)
session = MQTT::Session.new(@vhost, client_id)
session.clean_session if clean_session
@sessions[client_id] = session
@queues[client_id] = session
end

def clear_session(client : Client)
@sessions.delete client.client_id
@queues.delete client.client_id
def clear_session(client_id)
@sessions.delete client_id
@queues.delete client_id
end

# def connected(client) : MQTT::Session
Expand Down
8 changes: 4 additions & 4 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ module LavinMQ
Log.info { "eof #{ex.inspect}" }
ensure
publish_will if @will
disconnect_session(self) if @clean_session
@broker.clear_session(client_id) if @clean_session
@socket.close
@broker.vhost.rm_connection(self)
end
Expand Down Expand Up @@ -114,6 +114,7 @@ module LavinMQ
auto_delete = true
tbl = AMQP::Table.new
# TODO: declare Session instead
@broker.start_session(@client_id, @clean_session)
q = @broker.vhost.declare_queue(name, durable, auto_delete, tbl)
qos = Array(MQTT::SubAck::ReturnCode).new
packet.topic_filters.each do |tf|
Expand Down Expand Up @@ -145,21 +146,20 @@ module LavinMQ

def start_session(client) : MQTT::Session
if @clean_session
pp "clear session"
Log.trace { "clear session" }
@broker.clear_session(client)
end
@broker.start_session(client)
end

def disconnect_session(client)
pp "disconnect session"
Log.trace { "disconnect session" }
@broker.clear_session(client)
end

# TODO: actually publish will to session
private def publish_will
if will = @will
pp "Publish will to session"
end
rescue ex
Log.warn { "Failed to publish will: #{ex.message}" }
Expand Down
4 changes: 3 additions & 1 deletion src/lavinmq/mqtt/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ module LavinMQ
if packet = MQTT::Packet.from_io(socket).as?(MQTT::Connect)
Log.trace { "recv #{packet.inspect}" }
if user = authenticate(io, packet)
MQTT::Connack.new(false, MQTT::Connack::ReturnCode::Accepted).to_io(io)
session_present = @broker.session_present?(packet.client_id, packet.clean_session?)
pp "in connection_factory: #{session_present}"
MQTT::Connack.new(session_present, MQTT::Connack::ReturnCode::Accepted).to_io(io)
io.flush
return LavinMQ::MQTT::Client.new(socket, connection_info, user, @vhost, @broker, packet.client_id, packet.clean_session?, packet.will)
end
Expand Down
5 changes: 5 additions & 0 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module LavinMQ
module MQTT
class Session < Queue
@clean_session : Bool = false
getter clean_session
def initialize(@vhost : VHost,
@name : String,
@exclusive = true,
Expand All @@ -9,6 +11,9 @@ module LavinMQ
super
end

def set_clean_session
@clean_session = true
end

#TODO: implement subscribers array and session_present? and send instead of false
def connect(client)
Expand Down

0 comments on commit b83ea2b

Please sign in to comment.