Skip to content

Commit

Permalink
Remove min_isr (#789)
Browse files Browse the repository at this point in the history
Setting minimum in sync replicas (min_isr) > 0 with the current implementation will block LavinMQ from starting. The reason for this is that LavinMQ does some file actions on startup (cleanup, truncate, etc - for example the actions taken here). With min_isr set, these actions needs to be replicated to at least n (min_isr) number of nodes before LavinMQ can continue. However, the listener for the replication server has not been started yet when these actions are taken, which makes it impossible for any followers to connect to the leader, and therefore makes it impossible to replicate the actions to the required number of nodes. This leaves the leader node stuck on waiting for actions to be replicated to followers who are themselves waiting for the leader to accept connections.

Using clustering without min_isr will work the same way as if min_isr would be set to 0: The leader will not wait for followers to connect/reconnect before accepting connections and data. If followers are connected and in-sync, they will be kept in-sync for as long as possible. But if a follower disconnects, the leader node will not wait for it to come back online before accepting new connections and data. This means that it is possible to have data that isn't synced to any follower.

Several workarounds for this where taken into consideration, but we deemed none of them to be good enough. Which is why we decided to remove min_isr for now, until we have a better idea of how to implement it.
WHAT is this pull request doing?

Removes settings and code related to min_isr.

The current implementation has been saved as a backup in https://github.com/cloudamqp/lavinmq/tree/min_isr_backup.
  • Loading branch information
viktorerlingsson authored Oct 1, 2024
1 parent f0c8695 commit ddf5871
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 16 deletions.
3 changes: 1 addition & 2 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ describe LavinMQ::Clustering::Client do
end

it "can stream changes" do
LavinMQ::Config.instance.clustering_min_isr = 1
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")
Expand All @@ -51,7 +50,7 @@ describe LavinMQ::Clustering::Client do
repli.follow("localhost", tcp_server.local_address.port)
done.send nil
end

wait_for { replicator.followers.size == 1 }
with_amqp_server(replicator: replicator) do |s|
with_channel(s) do |ch|
q = ch.queue("repli")
Expand Down
9 changes: 0 additions & 9 deletions src/lavinmq/clustering/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ module LavinMQ

@lock = Mutex.new(:unchecked)
@followers = Array(Follower).new(4)
@has_followers = Channel(Int32).new
@password : String
@files = Hash(String, MFile?).new
@dirty_isr = true
Expand Down Expand Up @@ -151,8 +150,6 @@ module LavinMQ
@followers << follower
update_isr
end
while @has_followers.try_send? @followers.size
end
begin
follower.action_loop
ensure
Expand Down Expand Up @@ -190,16 +187,10 @@ module LavinMQ
@followers.each &.close
@followers.clear
end
@has_followers.close
Fiber.yield # required for follower/listener fibers to actually finish
end

private def each_follower(& : Follower -> Nil) : Nil
isr_count = @followers.size
until isr_count >= @config.clustering_min_isr
Log.warn { "ISR requirement not met (#{isr_count}/#{@config.clustering_min_isr})" }
isr_count = @has_followers.receive? || return
end
@lock.synchronize do
update_isr if @dirty_isr
@followers.each do |f|
Expand Down
5 changes: 0 additions & 5 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ module LavinMQ
property clustering_bind = "127.0.0.1"
property clustering_port = 5679
property clustering_max_lag = 8192 # number of clustering actions
property clustering_min_isr = 0 # number of In Sync Replicas required at all time
property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file
property consumer_timeout : UInt64? = nil
property consumer_timeout_loop_interval = 60 # seconds
Expand Down Expand Up @@ -137,9 +136,6 @@ module LavinMQ
p.on("--clustering-max-lag=ACTIONS", "Max unsynced replicated messages") do |v|
@clustering_max_lag = v.to_i
end
p.on("--clustering-min-isr=COUNT", "Required in-sync-replicas") do |v|
@clustering_min_isr = v.to_i
end
p.on("--clustering-etcd-endpoints=URIs", "Comma separeted host/port pairs (default: 127.0.0.1:2379)") do |v|
@clustering_etcd_endpoints = v
end
Expand Down Expand Up @@ -246,7 +242,6 @@ module LavinMQ
when "bind" then @clustering_bind = v
when "port" then @clustering_port = v.to_i32
when "max_lag" then @clustering_max_lag = v.to_i32
when "min_isr" then @clustering_min_isr = v.to_i32
else
STDERR.puts "WARNING: Unrecognized configuration 'clustering/#{config}'"
end
Expand Down

0 comments on commit ddf5871

Please sign in to comment.