Skip to content

Commit

Permalink
it compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
snichme committed Sep 24, 2024
1 parent 567b5d1 commit 2b3d56f
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 52 deletions.
9 changes: 0 additions & 9 deletions src/lavinmq/exchange/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,8 @@ module LavinMQ
@arguments == frame_args
end

def queue_bindings : Hash(BindingKey, Set(Queue))
Hash(BindingKey, Set(Queue)).new
end

# def exchange_bindings : Hash(BindingKey, Set(Exchange))
# Hash(BindingKey, Set(Exchange)).new
# end

def in_use?
return true unless all_bindings.empty?
# return true if exchange_bindings.size > 0
@vhost.exchanges.any? do |_, x|
x.all_bindings.each.select(Exchange).any? { |ex| ex == self }
end
Expand Down
12 changes: 1 addition & 11 deletions src/lavinmq/http/binding_helpers.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,7 @@ module LavinMQ
end
end

private def binding_for_props(context, source, destination : Queue, props)
binding = source.queue_bindings.find do |k, v|
v.includes?(destination) && BindingDetails.hash_key(k) == props
end
unless binding
not_found(context, "Binding '#{props}' on exchange '#{source.name}' -> queue '#{destination.name}' does not exist")
end
binding
end

private def binding_for_props(context, source, destination : Exchange, props)
private def binding_for_props(context, source, destination : Destination, props)
binding = source.bindings_details.find do |bd|
bd.destination == destination && bd.properties_key == props
end
Expand Down
30 changes: 16 additions & 14 deletions src/lavinmq/http/controller/bindings.cr
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ module LavinMQ
e = exchange(context, params, vhost)
q = queue(context, params, vhost, "queue")
props = URI.decode_www_form(params["props"])
binding = binding_for_props(context, e, q, props)
BindingDetails.new(e.name, vhost, binding[0][0], binding[0][1], q).to_json(context.response)
binding_for_props(context, e, q, props).to_json(context.response)
end
end

Expand All @@ -98,12 +97,14 @@ module LavinMQ
end
props = URI.decode_www_form(params["props"])
found = false
e.queue_bindings.each do |k, destinations|
next unless destinations.includes?(q) && BindingDetails.hash_key(k) == props
arguments = k[1] || AMQP::Table.new
@amqp_server.vhosts[vhost].unbind_queue(q.name, e.name, k[0], arguments)
e.bindings_details.each do |binding|
next unless binding.destination == q && binding.properties_key == props
arguments = binding.arguments || AMQP::Table.new
@amqp_server.vhosts[vhost].unbind_queue(q.name, e.name,
binding.routing_key, arguments)
found = true
Log.debug { "exchange '#{e.name}' unbound from queue '#{q.name}' with key '#{k}'" }
Log.debug { "exchange '#{e.name}' unbound from queue '#{q.name}' with " \
" key '#{binding.routing_key}'" }
break
end
context.response.status_code = found ? 204 : 404
Expand Down Expand Up @@ -177,13 +178,14 @@ module LavinMQ
end
props = URI.decode_www_form(params["props"])
found = false
source.exchange_bindings.each do |k, destinations|
next unless destinations.includes?(destination) && BindingDetails.hash_key(k) == props
arguments = k[1] || AMQP::Table.new
@amqp_server.vhosts[vhost].unbind_exchange(destination.name, source.name, k[0], arguments)
found = true
break
end
# TODO
# source.exchange_bindings.each do |k, destinations|
# next unless destinations.includes?(destination) && BindingDetails.hash_key(k) == props
# arguments = k[1] || AMQP::Table.new
# @amqp_server.vhosts[vhost].unbind_exchange(destination.name, source.name, k[0], arguments)
# found = true
# break
# end
context.response.status_code = found ? 204 : 404
end
end
Expand Down
6 changes: 3 additions & 3 deletions src/lavinmq/http/controller/queues.cr
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ module LavinMQ
with_vhost(context, params) do |vhost|
refuse_unless_management(context, user(context), vhost)
queue = queue(context, params, vhost)
itr = queue.bindings.map { |exchange, args| BindingDetails.new(exchange.name, vhost, args[0], args[1], queue) }
default_binding = BindingDetails.new("", queue.vhost.name, queue.name, nil, queue)
page(context, {default_binding}.each.chain(itr))
itr = queue.bindings # .map { |exchange, args| BindingDetails.new(exchange.name, vhost, args[0], args[1], queue) }
# default_binding = BindingDetails.new("", queue.vhost.name, queue.name, nil, queue)
page(context, itr) # {default_binding}.each.chain(itr))
end
end

Expand Down
27 changes: 12 additions & 15 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ module LavinMQ
when AMQP::Frame::Queue::Delete
if q = @queues.delete(f.queue_name)
@exchanges.each_value do |ex|
ex.queue_bindings.each do |binding_args, destinations|
ex.unbind(q, *binding_args) if destinations.includes?(q)
end
# ex.queue_bindings.each do |binding_args, destinations|
# ex.unbind(q, *binding_args) if destinations.includes?(q)
# end
end
store_definition(f, dirty: true) if !loading && q.durable? && !q.exclusive?
event_tick(EventType::QueueDeleted) unless loading
Expand All @@ -281,12 +281,9 @@ module LavinMQ
alias QueueBinding = Tuple(BindingKey, Exchange)

def queue_bindings(queue : Queue)
iterators = @exchanges.each_value.map do |ex|
ex.queue_bindings.each.select do |(_binding_args, destinations)|
destinations.includes?(queue)
end.map { |(binding_args, _destinations)| {ex, binding_args} }
@exchanges.each_value.flat_map do |ex|
ex.bindings_details.select { |binding| binding.destination == queue }
end
Iterator(QueueBinding).chain(iterators)
end

def add_operator_policy(name : String, pattern : String, apply_to : String,
Expand Down Expand Up @@ -542,13 +539,13 @@ module LavinMQ
io.write_bytes f
end
@exchanges.each_value.select(&.durable?).each do |e|
e.queue_bindings.each do |(routing_key, arguments), queues|
args = arguments || AMQP::Table.new
queues.each do |q|
f = AMQP::Frame::Queue::Bind.new(0_u16, 0_u16, q.name, e.name, routing_key, false, args)
io.write_bytes f
end
end
# e.queue_bindings.each do |(routing_key, arguments), queues|
# args = arguments || AMQP::Table.new
# queues.each do |q|
# f = AMQP::Frame::Queue::Bind.new(0_u16, 0_u16, q.name, e.name, routing_key, false, args)
# io.write_bytes f
# end
# end
# e.exchange_bindings.each do |(routing_key, arguments), exchanges|
# args = arguments || AMQP::Table.new
# exchanges.each do |ex|
Expand Down

0 comments on commit 2b3d56f

Please sign in to comment.