Skip to content

Commit

Permalink
add spec
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Sep 20, 2024
1 parent 08b0e9c commit a726357
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,37 @@ describe LavinMQ::Federation::Upstream do
end
end

it "should reconnect queue link after upstream disconnect" do
with_amqp_server do |s|
upstream, upstream_vhost, downstream_vhost = UpstreamSpecHelpers.setup_federation(s, "ef test upstream restart", nil, "upstream_q")
UpstreamSpecHelpers.start_link(upstream, "downstream_q", "queues")
s.users.add_permission("guest", "upstream", /.*/, /.*/, /.*/)
s.users.add_permission("guest", "downstream", /.*/, /.*/, /.*/)

upstream_vhost.declare_exchange("upstream_ex", "topic", true, false)
upstream_vhost.declare_queue("upstream_q", true, false)
downstream_vhost.declare_exchange("downstream_ex", "topic", true, false)
downstream_vhost.declare_queue("downstream_q", true, false)

wait_for { downstream_vhost.queues["downstream_q"].policy.try(&.name) == "FE" }
wait_for { upstream.links.first?.try &.state.running? }

sleep 1.seconds

# Disconnect the federation link
upstream_vhost.connections.each do |conn|
next unless conn.client_name.starts_with?("Federation link")
conn.close
end

sleep 1.seconds

# wait for federation link to be reconnected
wait_for { upstream.links.first?.try &.state.running? }
wait_for { upstream.links.first?.try { |l| l.@upstream_connection.try &.closed? == false } }
end
end

it "should continue after upstream restart" do
with_amqp_server do |s|
upstream, upstream_vhost, downstream_vhost = UpstreamSpecHelpers.setup_federation(s, "ef test upstream restart", "upstream_ex")
Expand Down

0 comments on commit a726357

Please sign in to comment.