diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index b86a7fc53..b83bc660c 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -224,6 +224,37 @@ describe LavinMQ::Federation::Upstream do end end + it "should reconnect queue link after upstream disconnect" do + with_amqp_server do |s| + upstream, upstream_vhost, downstream_vhost = UpstreamSpecHelpers.setup_federation(s, "ef test upstream restart", nil, "upstream_q") + UpstreamSpecHelpers.start_link(upstream, "downstream_q", "queues") + s.users.add_permission("guest", "upstream", /.*/, /.*/, /.*/) + s.users.add_permission("guest", "downstream", /.*/, /.*/, /.*/) + + upstream_vhost.declare_exchange("upstream_ex", "topic", true, false) + upstream_vhost.declare_queue("upstream_q", true, false) + downstream_vhost.declare_exchange("downstream_ex", "topic", true, false) + downstream_vhost.declare_queue("downstream_q", true, false) + + wait_for { downstream_vhost.queues["downstream_q"].policy.try(&.name) == "FE" } + wait_for { upstream.links.first?.try &.state.running? } + + sleep 1.seconds + + # Disconnect the federation link + upstream_vhost.connections.each do |conn| + next unless conn.client_name.starts_with?("Federation link") + conn.close + end + + sleep 1.seconds + + # wait for federation link to be reconnected + wait_for { upstream.links.first?.try &.state.running? } + wait_for { upstream.links.first?.try { |l| l.@upstream_connection.try &.closed? == false } } + end + end + it "should continue after upstream restart" do with_amqp_server do |s| upstream, upstream_vhost, downstream_vhost = UpstreamSpecHelpers.setup_federation(s, "ef test upstream restart", "upstream_ex")