Skip to content

Commit

Permalink
exchange.publish
Browse files Browse the repository at this point in the history
  • Loading branch information
snichme authored and kickster97 committed Oct 1, 2024
1 parent 6483a71 commit 4ef9cc4
Show file tree
Hide file tree
Showing 19 changed files with 526 additions and 419 deletions.
4 changes: 2 additions & 2 deletions spec/api/bindings_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe LavinMQ::HTTP::BindingsController do
response = http.post("/api/bindings/%2f/e/be1/q/bindings_q1", body: body)
response.status_code.should eq 201
response.headers["Location"].should eq "bindings_q1/rk"
s.vhosts["/"].exchanges["be1"].queue_bindings.last_key.routing_key.should eq "rk"
s.vhosts["/"].exchanges["be1"].bindings_details.first.routing_key.should eq "rk"
end
end

Expand Down Expand Up @@ -127,7 +127,7 @@ describe LavinMQ::HTTP::BindingsController do
props = binding[0]["properties_key"].as_s
response = http.delete("/api/bindings/%2f/e/be1/q/bindings_q1/#{props}")
response.status_code.should eq 204
s.vhosts["/"].exchanges["be1"].queue_bindings.empty?.should be_true
s.vhosts["/"].exchanges["be1"].bindings_details.empty?.should be_true
end
end
end
Expand Down
38 changes: 26 additions & 12 deletions spec/api/definitions_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,20 @@ describe LavinMQ::HTTP::Server do
]})
response = http.post("/api/definitions", body: body)
response.status_code.should eq 200
matches = [] of String
ex = s.vhosts["/"].exchanges["import_x1"]
ex.do_exchange_matches("r.k2", nil) { |e| matches << e.name }
matches.includes?("import_x2").should be_true
matches.clear
ex.do_queue_matches("rk", nil) { |e| matches << e.name }
matches.includes?("import_q1").should be_true
qs = Set(LavinMQ::Queue).new
es = Set(LavinMQ::Exchange).new
ex.find_queues("r.k2", nil, qs, es)
res = Set(LavinMQ::Exchange).new
res << s.vhosts["/"].exchanges["import_x1"]
res << s.vhosts["/"].exchanges["import_x2"]
es.should eq res
qs = Set(LavinMQ::Queue).new
es = Set(LavinMQ::Exchange).new
ex.find_queues("rk", nil, qs, es)
res = Set(LavinMQ::Queue).new
res << s.vhosts["/"].queues["import_q1"]
qs.should eq res
end
end

Expand Down Expand Up @@ -470,13 +477,20 @@ describe LavinMQ::HTTP::Server do
]})
response = http.post("/api/definitions/%2f", body: body)
response.status_code.should eq 200
matches = [] of String
ex = s.vhosts["/"].exchanges["import_x1"]
ex.do_exchange_matches("r.k2", nil) { |e| matches << e.name }
matches.includes?("import_x2").should be_true
matches.clear
ex.do_queue_matches("rk", nil) { |e| matches << e.name }
matches.includes?("import_q1").should be_true
qs = Set(LavinMQ::Queue).new
es = Set(LavinMQ::Exchange).new
ex.find_queues("r.k2", nil, qs, es)
res = Set(LavinMQ::Exchange).new
res << s.vhosts["/"].exchanges["import_x1"]
res << s.vhosts["/"].exchanges["import_x2"]
es.should eq res
qs = Set(LavinMQ::Queue).new
es = Set(LavinMQ::Exchange).new
ex.find_queues("rk", nil, qs, es)
res = Set(LavinMQ::Queue).new
res << s.vhosts["/"].queues["import_q1"]
qs.should eq res
end
end

Expand Down
126 changes: 120 additions & 6 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ module LavinMQ
# Monkey patch for backward compability and easier testing
def matches(routing_key, headers = nil) : Set(Queue | Exchange)
s = Set(Queue | Exchange).new
queue_matches(routing_key, headers) { |q| s << q }
exchange_matches(routing_key, headers) { |x| s << x }
qs = Set(Queue).new
es = Set(Exchange).new
find_queues(routing_key, headers, qs, es)
qs.each { |q| s << q }
s
end
end
Expand All @@ -19,15 +21,20 @@ describe LavinMQ::DirectExchange do
q1 = LavinMQ::Queue.new(vhost, "q1")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.matches("q1").should eq(Set{q1})
found_queues = Set(LavinMQ::Queue).new
x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new)
found_queues.should eq(Set{q1})
end
end

it "matches no rk" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
x = LavinMQ::DirectExchange.new(vhost, "")
x.matches("q1").should be_empty

found_queues = Set(LavinMQ::Queue).new
x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new)
found_queues.should be_empty
end
end
end
Expand All @@ -39,15 +46,20 @@ describe LavinMQ::FanoutExchange do
q1 = LavinMQ::Queue.new(vhost, "q1")
x = LavinMQ::FanoutExchange.new(vhost, "")
x.bind(q1, "")
x.matches("any").should eq(Set{q1})

found_queues = Set(LavinMQ::Queue).new
x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new)
found_queues.should eq(Set{q1})
end
end

it "matches no rk" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
x = LavinMQ::FanoutExchange.new(vhost, "")
x.matches("q1").should be_empty
found_queues = Set(LavinMQ::Queue).new
x.find_queues("q1", nil, found_queues, Set(LavinMQ::Exchange).new)
found_queues.should be_empty
end
end
end
Expand Down Expand Up @@ -307,3 +319,105 @@ describe LavinMQ::HeadersExchange do
end
end
end

describe LavinMQ::Exchange do
it "should handle CC in header" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["CC"] = ["q2"]
x.find_queues("q1", headers, found_queues)
found_queues.should eq(Set{q1, q2})
end
end
it "should raise unless CC header isn't array" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["CC"] = "q2"
expect_raises(LavinMQ::Error::PreconditionFailed) do
x.find_queues("q1", headers, found_queues)
end
end
end

it "should handle BCC in header" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["BCC"] = ["q2"]
x.find_queues("q1", headers, found_queues)
found_queues.should eq(Set{q1, q2})
end
end

it "should raise unless BCC header isn't array" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["BCC"] = "q2"
expect_raises(LavinMQ::Error::PreconditionFailed) do
x.find_queues("q1", headers, found_queues)
end
end
end

it "should drop BCC from header" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["BCC"] = ["q2"]
x.find_queues("q1", headers, found_queues)
headers["BCC"]?.should be_nil
end
end

it "should read both CC and BCC" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
q2 = LavinMQ::Queue.new(vhost, "q2")
q3 = LavinMQ::Queue.new(vhost, "q3")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.bind(q2, "q2", LavinMQ::AMQP::Table.new)
x.bind(q3, "q3", LavinMQ::AMQP::Table.new)
found_queues = Set(LavinMQ::Queue).new
headers = LavinMQ::AMQP::Table.new
headers["CC"] = ["q2"]
headers["BCC"] = ["q3"]
x.find_queues("q1", headers, found_queues)
found_queues.should eq(Set{q1, q2, q3})
end
end
end
13 changes: 6 additions & 7 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message", q.name
q.get(no_ack: true).try(&.body_io.to_s).should eq("test message")

iq = s.vhosts["/"].exchanges[x_name].queue_bindings[LavinMQ::BindingKey.new(q.name, nil)].first
iq = s.vhosts["/"].queues[q.name]
iq.pause!

x.publish_confirm "test message 2", q.name
Expand Down Expand Up @@ -93,7 +93,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message", q.name
q.get(no_ack: true).try(&.body_io.to_s).should eq("test message")

iq = s.vhosts["/"].exchanges[x_name].queue_bindings[LavinMQ::BindingKey.new(q.name, nil)].first
iq = s.vhosts["/"].queues[q.name]
iq.pause!

x.publish_confirm "test message 2", q.name
Expand Down Expand Up @@ -126,7 +126,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message", q.name
q.get(no_ack: true).try(&.body_io.to_s).should eq("test message")

iq = s.vhosts["/"].exchanges[x_name].queue_bindings[LavinMQ::BindingKey.new(q.name, nil)].first
iq = s.vhosts["/"].queues[q.name]
iq.pause!

x.publish_confirm "test message 2", q.name
Expand Down Expand Up @@ -187,7 +187,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message 3", q.name
x.publish_confirm "test message 4", q.name

internal_queue = s.vhosts["/"].exchanges[x_name].queue_bindings[LavinMQ::BindingKey.new(q.name, nil)].first
internal_queue = s.vhosts["/"].queues[q.name]
internal_queue.message_count.should eq 4

response = http.delete("/api/queues/%2f/#{q_name}/contents")
Expand All @@ -209,7 +209,7 @@ describe LavinMQ::Queue do
end

vhost = s.vhosts["/"]
internal_queue = vhost.exchanges[x_name].queue_bindings[LavinMQ::BindingKey.new(q.name, nil)].first
internal_queue = vhost.queues[q.name]
internal_queue.message_count.should eq 10

response = http.delete("/api/queues/%2f/#{q_name}/contents?count=5")
Expand All @@ -235,8 +235,7 @@ describe LavinMQ::Queue do
x.publish_confirm "test message #{i}", q.name
end

internal_queue = s.vhosts["/"].exchanges[x_name].queue_bindings[LavinMQ::BindingKey.new(q.name, nil)].first

internal_queue = s.vhosts["/"].queues[q.name]
internal_queue.message_count.should eq 10

channel = Channel(String).new(1)
Expand Down
6 changes: 3 additions & 3 deletions spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -432,18 +432,18 @@ describe LavinMQ::Federation::Upstream do
wait_for { upstream.links.first?.try &.state.running? }

upstream_q = upstream_vhost.queues.values.first
upstream_q.bindings.size.should eq queues.size
upstream_q.bindings.size.should eq queues.size + 1 # +1 for the default exchange
# Assert setup is correct
10.times do |i|
downstream_q = downstream_ch.queue("")
downstream_q.bind("downstream_ex", "after.link.#{i}")
queues << downstream_q
end
sleep 0.1.seconds
upstream_q.bindings.size.should eq queues.size
upstream_q.bindings.size.should eq queues.size + 1
queues.each &.delete
sleep 10.milliseconds
upstream_q.bindings.size.should eq 0
upstream_q.bindings.size.should eq 1
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/vhost_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ describe LavinMQ::VHost do
v.declare_queue("q", true, false)
s.vhosts["test"].bind_queue("q", "e", "q")
s.restart
s.vhosts["test"].exchanges["e"].queue_bindings[LavinMQ::BindingKey.new("q", nil)].size.should eq 1
s.vhosts["test"].exchanges["e"].bindings_details.first.destination.name.should eq "q"
end
end

Expand Down
24 changes: 14 additions & 10 deletions src/lavinmq/amqp/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ module LavinMQ
end
return unless valid_q_bind_unbind?(frame)

q = @vhost.queues.fetch(frame.queue_name, nil)
q = @vhost.queues[frame.queue_name]?
if q.nil?
send_not_found frame, "Queue '#{frame.queue_name}' not found"
elsif !@vhost.exchanges.has_key? frame.exchange_name
Expand All @@ -688,8 +688,12 @@ module LavinMQ
elsif queue_exclusive_to_other_client?(q)
send_resource_locked(frame, "Exclusive queue")
else
@vhost.apply(frame)
send AMQP::Frame::Queue::BindOk.new(frame.channel) unless frame.no_wait
begin
@vhost.apply(frame)
send AMQP::Frame::Queue::BindOk.new(frame.channel) unless frame.no_wait
rescue ex : LavinMQ::Exchange::AccessRefused
send_access_refused(frame, ex.message)
end
end
end

Expand All @@ -699,7 +703,7 @@ module LavinMQ
end
return unless valid_q_bind_unbind?(frame)

q = @vhost.queues.fetch(frame.queue_name, nil)
q = @vhost.queues[frame.queue_name]?
if q.nil?
# should return not_found according to spec but we make it idempotent
send AMQP::Frame::Queue::UnbindOk.new(frame.channel)
Expand All @@ -713,8 +717,12 @@ module LavinMQ
elsif queue_exclusive_to_other_client?(q)
send_resource_locked(frame, "Exclusive queue")
else
@vhost.apply(frame)
send AMQP::Frame::Queue::UnbindOk.new(frame.channel)
begin
@vhost.apply(frame)
send AMQP::Frame::Queue::UnbindOk.new(frame.channel)
rescue ex : LavinMQ::Exchange::AccessRefused
send_access_refused(frame, ex.message)
end
end
end

Expand All @@ -725,10 +733,6 @@ module LavinMQ
elsif !valid_entity_name(frame.exchange_name)
send_precondition_failed(frame, "Exchange name isn't valid")
return false
elsif frame.exchange_name.empty? || frame.exchange_name == DEFAULT_EX
target = frame.is_a?(AMQP::Frame::Queue::Bind) ? "bind to" : "unbind from"
send_access_refused(frame, "Not allowed to #{target} the default exchange")
return false
end
true
end
Expand Down
Loading

0 comments on commit 4ef9cc4

Please sign in to comment.