Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication #506

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,41 @@ There are a few edge-cases that are handled a bit differently in LavinMQ compare
- TTL of queues and messages are correct to the 0.1 second, not to the millisecond
- Newlines are not removed from Queue or Exchange names, they are forbidden

## Replication

LavinMQ supports replication between a leader server and one or more followers. All changes on the leader is replicated to followers.

### Replication configuration

A shared secret is used to allow nodes in a cluster to communicate, make sure to that the `.replication_secret` file is the same in all data directores of all nodes.

Then enable the replication listener on the leader:

```ini
[replication]
bind = 0.0.0.0
port = 5679
```

or start LavinMQ with:

```sh
lavinmq --data-dir /var/lib/lavinmq --replication-bind 0.0.0.0 --replication-port 5679
```

Configure the follower(s) to connect to the leader:

```ini
[replication]
follow = tcp://hostname:port
```

or start LavinMQ with:

```sh
lavinmq --data-dir /var/lib/lavinmq-follower --follow tcp://leader.example.com:5679
```

## Contributors

- [Carl Hörberg](mailto:[email protected])
Expand Down
61 changes: 61 additions & 0 deletions spec/replication_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require "./spec_helper"
require "../src/lavinmq/replication/client"

describe LavinMQ::Replication::Client do
data_dir = "/tmp/lavinmq-follower"

before_each do
Dir.mkdir_p data_dir
File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
end

after_each do
FileUtils.rm_rf data_dir
end

it "can synchronize" do
with_channel do |ch|
q = ch.queue("repli")
q.publish_confirm "hello world"
end
repli = LavinMQ::Replication::Client.new(data_dir)
repli.sync("127.0.0.1", LavinMQ::Config.instance.replication_port)
repli.close

server = LavinMQ::Server.new(data_dir)
begin
q = server.vhosts["/"].queues["repli"].as(LavinMQ::DurableQueue)
q.basic_get(true) do |env|
String.new(env.message.body).to_s.should eq "hello world"
end.should be_true
ensure
server.close
end
end

it "can stream changes" do
done = Channel(Nil).new
repli = LavinMQ::Replication::Client.new(data_dir)
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
done.send nil
end
with_channel do |ch|
q = ch.queue("repli")
q.publish_confirm "hello world"
end
sleep 0.1
repli.close
done.receive

server = LavinMQ::Server.new(data_dir)
begin
q = server.vhosts["/"].queues["repli"].as(LavinMQ::DurableQueue)
q.basic_get(true) do |env|
String.new(env.message.body).to_s.should eq "hello world"
end.should be_true
ensure
server.close
end
end
end
16 changes: 9 additions & 7 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
require "spec"
require "file_utils"
require "amqp-client"
require "../src/lavinmq/config"
require "../src/lavinmq/server"
require "../src/lavinmq/log_formatter"
require "../src/lavinmq/http/http_server"
require "http/client"
require "socket"
require "uri"
require "amqp-client"

Log.setup_from_env

Expand All @@ -21,13 +16,19 @@ HTTP_PORT = ENV.fetch("HTTP_PORT", "8080").to_i
BASE_URL = "http://localhost:#{HTTP_PORT}"
DATA_DIR = "/tmp/lavinmq-spec"

Dir.mkdir_p DATA_DIR
LavinMQ::Config.instance.tap do |cfg|
cfg.segment_size = 512 * 1024
cfg.data_dir = DATA_DIR
cfg.amqp_port = AMQP_PORT
cfg.amqps_port = AMQPS_PORT
cfg.http_port = HTTP_PORT
cfg.segment_size = 512 * 1024
end

# have to be required after config
require "../src/lavinmq/server"
require "../src/lavinmq/http/http_server"

def with_channel(**args, &)
name = nil
if formatter = Spec.formatters[0].as?(Spec::VerboseFormatter)
Expand Down Expand Up @@ -86,6 +87,7 @@ end
def start_amqp_server
s = LavinMQ::Server.new(DATA_DIR)
spawn { s.listen(LavinMQ::Config.instance.amqp_bind, LavinMQ::Config.instance.amqp_port) }
spawn { s.listen_replication("127.0.0.1", LavinMQ::Config.instance.replication_port) }
ctx = OpenSSL::SSL::Context::Server.new
ctx.certificate_chain = "spec/resources/server_certificate.pem"
ctx.private_key = "spec/resources/server_key.pem"
Expand Down
11 changes: 10 additions & 1 deletion src/lavinmq.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,13 @@ LavinMQ::ServerCLI.new(config).parse

# config has to be loaded before we require vhost/queue, byte_format is a constant
require "./lavinmq/launcher"
LavinMQ::Launcher.new(config).run # will block
require "./lavinmq/replication/client"
if uri = config.replication_follow
begin
LavinMQ::Replication::Client.new(config.data_dir).follow(uri)
rescue ex : ArgumentError
abort ex.message
end
else
LavinMQ::Launcher.new(config).run # will block
end
26 changes: 20 additions & 6 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "log"
require "uri"

module LavinMQ
class Config
Expand Down Expand Up @@ -43,6 +44,9 @@ module LavinMQ
property? log_exchange : Bool = false
property free_disk_min : Int64 = 0 # bytes
property free_disk_warn : Int64 = 0 # bytes
property replication_follow : URI? = nil
property replication_bind : String? = nil
property replication_port = 5679
@@instance : Config = self.new

def self.instance : LavinMQ::Config
Expand All @@ -58,12 +62,10 @@ module LavinMQ
ini = INI.parse(File.read(file))
ini.each do |section, settings|
case section
when "main"
parse_main(settings)
when "amqp"
parse_amqp(settings)
when "mgmt", "http"
parse_mgmt(settings)
when "main" then parse_main(settings)
when "amqp" then parse_amqp(settings)
when "mgmt", "http" then parse_mgmt(settings)
when "replication" then parse_replication(settings)
else
raise "Unrecognized config section: #{section}"
end
Expand Down Expand Up @@ -105,6 +107,18 @@ module LavinMQ
end
end

private def parse_replication(settings)
settings.each do |config, v|
case config
when "follow" then @replication_follow = URI.parse(v)
when "bind" then @replication_bind = v
when "port" then @replication_port = v.to_i32
else
STDERR.puts "WARNING: Unrecognized configuration 'replication/#{config}'"
end
end
end

# ameba:disable Metrics/CyclomaticComplexity
private def parse_amqp(settings)
settings.each do |config, v|
Expand Down
46 changes: 46 additions & 0 deletions src/lavinmq/data_dir_lock.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
module LavinMQ
# Make sure that only one instance is using the data directory
# Can work as a poor mans cluster where the master nodes acquires
# a file lock on a shared file system like NFS
class DataDirLock
Log = ::Log.for(self)

def initialize(data_dir)
@lock = File.open(File.join(data_dir, ".lock"), "a+")
@lock.sync = true
@lock.read_buffering = false
end

def acquire
begin
@lock.flock_exclusive(blocking: false)
rescue
Log.info { "Data directory locked by '#{@lock.gets_to_end}'" }
Log.info { "Waiting for file lock to be released" }
@lock.flock_exclusive(blocking: true)
Log.info { "Lock acquired" }
end
@lock.truncate
@lock.print System.hostname
@lock.fsync
end

def release
@lock.truncate
@lock.fsync
@lock.flock_unlock
end

# Read from the lock file to detect lost lock
# See "Lost locks" in `man 2 fcntl`
def poll
loop do
GC.collect
sleep 30
@lock.read_at(0, 1, &.read_byte) || raise IO::EOFError.new
end
rescue ex : IO::Error | ArgumentError
abort "ERROR: Lost data directory lock! #{ex.inspect}"
end
end
end
1 change: 1 addition & 0 deletions src/lavinmq/http/controller/nodes.cr
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ module LavinMQ
proc_used: Fiber.count,
run_queue: 0,
sockets_used: @amqp_server.vhosts.sum { |_, v| v.connections.size },
followers: @amqp_server.followers,
}
end

Expand Down
58 changes: 17 additions & 41 deletions src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ require "./server"
require "./http/http_server"
require "./log_formatter"
require "./in_memory_backend"
require "./data_dir_lock"

module LavinMQ
class Launcher
Log = ::Log.for "launcher"
@tls_context : OpenSSL::SSL::Context::Server?
@first_shutdown_attempt = true
@lock_file : File?
@data_dir_lock : DataDirLock?

def initialize(@config : LavinMQ::Config)
reload_logger
Expand All @@ -21,7 +22,9 @@ module LavinMQ
print_max_map_count
Launcher.maximize_fd_limit
Dir.mkdir_p @config.data_dir
@lock_file = acquire_lock if @config.data_dir_lock?
if @config.data_dir_lock?
@data_dir_lock = DataDirLock.new(@config.data_dir).tap &.acquire
end
@amqp_server = LavinMQ::Server.new(@config.data_dir)
@http_server = LavinMQ::HTTP::Server.new(@amqp_server)
@tls_context = create_tls_context if @config.tls_configured?
Expand All @@ -33,11 +36,13 @@ module LavinMQ
def run
listen
SystemD.notify_ready
hostname = System.hostname.to_slice
loop do
GC.collect
sleep 10
@lock_file.try &.write_at(hostname, 0)
if lock = @data_dir_lock
lock.poll
else
loop do
GC.collect
sleep 30
end
end
end

Expand Down Expand Up @@ -123,6 +128,10 @@ module LavinMQ
end
end

if replication_bind = @config.replication_bind
spawn @amqp_server.listen_replication(replication_bind, @config.replication_port), name: "Replication listener"
end

unless @config.unix_path.empty?
spawn @amqp_server.listen_unix(@config.unix_path), name: "AMQP listening at #{@config.unix_path}"
end
Expand Down Expand Up @@ -191,7 +200,7 @@ module LavinMQ
Log.info { "Shutting down gracefully..." }
@http_server.close
@amqp_server.close
@lock_file.try &.close
@data_dir_lock.try &.release
Log.info { "Fibers: " }
Fiber.list { |f| Log.info { f.inspect } }
exit 0
Expand All @@ -211,39 +220,6 @@ module LavinMQ
Signal::SEGV.reset # Let the OS generate a coredump
end

# Make sure that only one instance is using the data directory
# Can work as a poor mans cluster where the master nodes aquires
# a file lock on a shared file system like NFS
private def acquire_lock : File
lock = File.open(File.join(@config.data_dir, ".lock"), "w+")
lock.sync = true
lock.read_buffering = false
begin
lock.flock_exclusive(blocking: false)
rescue
Log.info { "Data directory locked by '#{lock.gets_to_end}'" }
Log.info { "Waiting for file lock to be released" }
lock.flock_exclusive(blocking: true)
Log.info { "Lock acquired" }
end
lock.truncate
lock.print System.hostname
lock.fsync
lock
end

# write to the lock file to detect lost lock
# See "Lost locks" in `man 2 fcntl`
private def hold_lock(lock)
hostname = System.hostname.to_slice
loop do
sleep 30
lock.write_at hostname, 0
end
rescue ex : IO::Error
abort "ERROR: Lost lock! #{ex.inspect}"
end

private def create_tls_context
context = OpenSSL::SSL::Context::Server.new
context.add_options(OpenSSL::SSL::Options.new(0x40000000)) # disable client initiated renegotiation
Expand Down
Loading
Loading