diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index 3b4849331..f6997d1e1 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -40,7 +40,6 @@ describe LavinMQ::Clustering::Client do end it "can stream changes" do - LavinMQ::Config.instance.clustering_min_isr = 1 replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0) tcp_server = TCPServer.new("localhost", 0) spawn(replicator.listen(tcp_server), name: "repli server spec") @@ -51,7 +50,7 @@ describe LavinMQ::Clustering::Client do repli.follow("localhost", tcp_server.local_address.port) done.send nil end - + wait_for { replicator.followers.size == 1 } with_amqp_server(replicator: replicator) do |s| with_channel(s) do |ch| q = ch.queue("repli") diff --git a/src/lavinmq/clustering/server.cr b/src/lavinmq/clustering/server.cr index 4033bab66..be1484d0e 100644 --- a/src/lavinmq/clustering/server.cr +++ b/src/lavinmq/clustering/server.cr @@ -28,7 +28,6 @@ module LavinMQ @lock = Mutex.new(:unchecked) @followers = Array(Follower).new(4) - @has_followers = Channel(Int32).new @password : String @files = Hash(String, MFile?).new @dirty_isr = true @@ -151,8 +150,6 @@ module LavinMQ @followers << follower update_isr end - while @has_followers.try_send? @followers.size - end begin follower.action_loop ensure @@ -190,16 +187,10 @@ module LavinMQ @followers.each &.close @followers.clear end - @has_followers.close Fiber.yield # required for follower/listener fibers to actually finish end private def each_follower(& : Follower -> Nil) : Nil - isr_count = @followers.size - until isr_count >= @config.clustering_min_isr - Log.warn { "ISR requirement not met (#{isr_count}/#{@config.clustering_min_isr})" } - isr_count = @has_followers.receive? || return - end @lock.synchronize do update_isr if @dirty_isr @followers.each do |f| diff --git a/src/lavinmq/config.cr b/src/lavinmq/config.cr index 3eafc64c8..30779b01a 100644 --- a/src/lavinmq/config.cr +++ b/src/lavinmq/config.cr @@ -56,7 +56,6 @@ module LavinMQ property clustering_bind = "127.0.0.1" property clustering_port = 5679 property clustering_max_lag = 8192 # number of clustering actions - property clustering_min_isr = 0 # number of In Sync Replicas required at all time property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file property consumer_timeout : UInt64? = nil property consumer_timeout_loop_interval = 60 # seconds @@ -137,9 +136,6 @@ module LavinMQ p.on("--clustering-max-lag=ACTIONS", "Max unsynced replicated messages") do |v| @clustering_max_lag = v.to_i end - p.on("--clustering-min-isr=COUNT", "Required in-sync-replicas") do |v| - @clustering_min_isr = v.to_i - end p.on("--clustering-etcd-endpoints=URIs", "Comma separeted host/port pairs (default: 127.0.0.1:2379)") do |v| @clustering_etcd_endpoints = v end @@ -246,7 +242,6 @@ module LavinMQ when "bind" then @clustering_bind = v when "port" then @clustering_port = v.to_i32 when "max_lag" then @clustering_max_lag = v.to_i32 - when "min_isr" then @clustering_min_isr = v.to_i32 else STDERR.puts "WARNING: Unrecognized configuration 'clustering/#{config}'" end