Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lavinmqctl on follower nodes #785

Merged
merged 7 commits into from
Sep 19, 2024
2 changes: 2 additions & 0 deletions src/lavinmq/clustering/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ module LavinMQ
@unix_amqp_proxy = Proxy.new(@config.unix_path) unless @config.unix_path.empty?
@unix_http_proxy = Proxy.new(@config.http_unix_path) unless @config.http_unix_path.empty?
end
HTTP::Server.follower_internal_socket_http_server

Signal::INT.trap { close_and_exit }
Signal::TERM.trap { close_and_exit }
end
Expand Down
20 changes: 18 additions & 2 deletions src/lavinmq/http/http_server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module LavinMQ
Log = ::Log.for "http"

class Server
getter http

def initialize(@amqp_server : LavinMQ::Server)
handlers = [
StrictTransportSecurity.new,
Expand Down Expand Up @@ -64,9 +66,9 @@ module LavinMQ
addr
end

def bind_internal_unix
def self.bind_internal_unix(http)
File.delete?(INTERNAL_UNIX_SOCKET)
addr = @http.bind_unix(INTERNAL_UNIX_SOCKET)
addr = http.bind_unix(INTERNAL_UNIX_SOCKET)
File.chmod(INTERNAL_UNIX_SOCKET, 0o660)
Log.info { "Bound to #{addr}" }
addr
Expand All @@ -80,6 +82,20 @@ module LavinMQ
@http.try &.close
File.delete?(INTERNAL_UNIX_SOCKET)
end

# Starts a HTTP server that binds to the internal UNIX socket used by lavinmqctl.
# The server returns 503 to signal that the node is a follower and can not handle the request.
def self.follower_internal_socket_http_server
http_server = ::HTTP::Server.new do |context|
context.response.status_code = 503
context.response.status_message = "This node is a follower and does not handle HTTP requests."
end
self.bind_internal_unix(http_server)
viktorerlingsson marked this conversation as resolved.
Show resolved Hide resolved

spawn(name: "HTTP listener") do
http_server.listen
end
end
end
end
end
2 changes: 1 addition & 1 deletion src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ module LavinMQ
@http_server.bind_unix(@config.http_unix_path)
end

@http_server.bind_internal_unix
HTTP::Server.bind_internal_unix(@http_server.http)
spawn(name: "HTTP listener") do
@http_server.not_nil!.listen
end
Expand Down
5 changes: 5 additions & 0 deletions src/lavinmqctl.cr
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ class LavinMQCtl

private def handle_response(resp, *ok)
return if ok.includes? resp.status_code
if resp.status_code == 503
puts "[ERROR] This node is a follower and does not handle lavinmqctl commands."
viktorerlingsson marked this conversation as resolved.
Show resolved Hide resolved
puts "Please connect to the leader node by using the --host option."
exit 2
end
puts "#{resp.status_code} - #{resp.status}"
puts resp.body if resp.body? && !resp.headers["Content-Type"]?.try(&.starts_with?("text/html"))
exit 1
Expand Down
Loading