From 43a622df88e49f51bf17d0d656fad9a6d6a54c2e Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Thu, 5 Oct 2023 12:06:13 +0200 Subject: [PATCH 01/12] Upgrade ruby to v3.2 as 2.7 is not maintained anymore. --- .github/workflows/tests.yml | 2 +- .github/workflows/tests_5.7.yml | 2 +- Gemfile | 1 + Gemfile.lock | 20 +++++++++++--------- Makefile | 2 +- dev.yml | 2 +- test/test_helper.rb | 2 +- 7 files changed, 17 insertions(+), 14 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1cd54270..6d0fdce6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -63,7 +63,7 @@ jobs: - name: Setup Ruby uses: ruby/setup-ruby@v1 with: - ruby-version: 2.7 + ruby-version: 3.2 bundler-cache: true - name: Starting up MySQL diff --git a/.github/workflows/tests_5.7.yml b/.github/workflows/tests_5.7.yml index 62aa65b3..19eb45c6 100644 --- a/.github/workflows/tests_5.7.yml +++ b/.github/workflows/tests_5.7.yml @@ -63,7 +63,7 @@ jobs: - name: Setup Ruby uses: ruby/setup-ruby@v1 with: - ruby-version: 2.7 + ruby-version: 3.2 bundler-cache: true - name: Starting up MySQL diff --git a/Gemfile b/Gemfile index 3ef9386a..e404666d 100644 --- a/Gemfile +++ b/Gemfile @@ -3,6 +3,7 @@ source "https://rubygems.org" group :test do gem "minitest" gem "mysql2" + gem "webrick" gem "minitest-hooks" gem "minitest-reporters", "~> 1.4" diff --git a/Gemfile.lock b/Gemfile.lock index d4898b6d..42ce55a5 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -6,25 +6,26 @@ GEM byebug (11.1.3) coderay (1.1.3) method_source (1.0.0) - minitest (5.14.4) + minitest (5.20.0) minitest-fail-fast (0.1.0) minitest (~> 5) - minitest-hooks (1.5.0) + minitest-hooks (1.5.1) minitest (> 5.3) - minitest-reporters (1.4.3) + minitest-reporters (1.6.1) ansi builder minitest (>= 5.0) ruby-progressbar - mysql2 (0.5.3) - pry (0.13.1) + mysql2 (0.5.5) + pry (0.14.2) coderay (~> 1.1) method_source (~> 1.0) - pry-byebug (3.9.0) + pry-byebug (3.10.1) byebug (~> 11.0) - pry (~> 0.13.0) - ruby-progressbar (1.11.0) - tqdm (0.3.0) + pry (>= 0.13, < 0.15) + ruby-progressbar (1.13.0) + tqdm (0.4.1) + webrick (1.8.1) PLATFORMS ruby @@ -37,6 +38,7 @@ DEPENDENCIES mysql2 pry-byebug tqdm + webrick BUNDLED WITH 2.2.22 diff --git a/Makefile b/Makefile index 4dc22e85..29416f0d 100644 --- a/Makefile +++ b/Makefile @@ -67,7 +67,7 @@ test-go: test-ruby: bundle install - ruby test/main.rb + bundle exec ruby test/main.rb test: test-go test-ruby diff --git a/dev.yml b/dev.yml index 283830b5..d533c53e 100644 --- a/dev.yml +++ b/dev.yml @@ -9,7 +9,7 @@ up: or: [mysql@5.7] conflicts: [mysql-connector-c, mysql, mysql-client] - - ruby: "2.7.3" + - ruby: "3.2.2" - bundler - go: version: "1.16" diff --git a/test/test_helper.rb b/test/test_helper.rb index 59427bb2..626bbcad 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -55,7 +55,7 @@ def new_ghostferry(filepath, config: {}) end def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0) - g = new_ghostferry(filepath, config) + g = new_ghostferry(filepath, config: config) batches_written = 0 g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do From 13ea4d482cba7c5dc60410f5600fb852ba401a4b Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Thu, 5 Oct 2023 12:46:55 +0200 Subject: [PATCH 02/12] Use momnotonic clock for duration --- test/helpers/ghostferry_helper.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 4b9295bc..79036805 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -156,7 +156,7 @@ def compile_binary def start_server @server_last_error = nil - @last_message_time = Time.now + @last_message_time = now @server = WEBrick::HTTPServer.new( BindAddress: "127.0.0.1", Port: @server_port, @@ -185,7 +185,7 @@ def start_server status = status.first - @last_message_time = Time.now + @last_message_time = now @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? rescue StandardError => e # errors are not reported from WEBrick but the server should fail early @@ -328,7 +328,7 @@ def start_server_watchdog # HTTP server to free up the port. @server_watchdog_thread = Thread.new do while @subprocess_thread.alive? do - if Time.now - @last_message_time > @message_timeout + if (now - @last_message_time) > @message_timeout @server.shutdown raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" end @@ -405,5 +405,9 @@ def with_env(key, value) ensure ENV[key] = previous_value end + + def now + ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + end end end From b89b70a7fa51db16101454ebcdae1dbaa2fe0845 Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Thu, 5 Oct 2023 13:30:57 +0200 Subject: [PATCH 03/12] There's a possibility to have more than one status if it's an array --- test/helpers/ghostferry_helper.rb | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 79036805..f2cfa84f 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -174,19 +174,22 @@ def start_server query = CGI::parse(req.body) - status = query["status"] + statuses = query["status"] data = query["data"] - unless status + unless statuses @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status") resp.status = 400 @server.shutdown end - status = status.first - @last_message_time = now - @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? + + statuses.each do |status| + next if @status_handlers[status].nil? + + @status_handlers[status].each { |f| f.call(*data) } + end rescue StandardError => e # errors are not reported from WEBrick but the server should fail early # as this indicates there is likely a programming error. From 9822b6eb779aef6c6149ca1988d810f36717499e Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Mon, 9 Oct 2023 15:37:07 +0200 Subject: [PATCH 04/12] Debug received statuses --- test/helpers/ghostferry_helper.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index f2cfa84f..a7c8a3e1 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -174,17 +174,21 @@ def start_server query = CGI::parse(req.body) - statuses = query["status"] - data = query["data"] + statuses = Array(query["status"]) - unless statuses + if statuses.empty? @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status") resp.status = 400 @server.shutdown + elsif statuses.size > 1 + @logger.warn("Got multiple statuses at once: #{statuses.inspect}") + puts "Got multiple statuses at once: #{statuses.inspect}" end @last_message_time = now + data = query["data"] + statuses.each do |status| next if @status_handlers[status].nil? From d575e5f0c3d989451e1b461ff08885f61ca3a210 Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Mon, 9 Oct 2023 16:00:16 +0200 Subject: [PATCH 05/12] Add more logging --- test/helpers/ghostferry_helper.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index a7c8a3e1..b099888b 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -337,6 +337,7 @@ def start_server_watchdog while @subprocess_thread.alive? do if (now - @last_message_time) > @message_timeout @server.shutdown + @logger&.print_output raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" end From 6d41d77085cd62ece2f3e5b4a50a5c7245944f7d Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Mon, 9 Oct 2023 16:31:30 +0200 Subject: [PATCH 06/12] Use capturer --- test/helpers/ghostferry_helper.rb | 11 ++++------- test/test_helper.rb | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index b099888b..af70b074 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -49,12 +49,9 @@ module Status attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines - def initialize(main_path, config: {}, logger: nil, message_timeout: 30, port: 39393) - @logger = logger - if @logger.nil? - @logger = Logger.new(STDOUT) - @logger.level = Logger::DEBUG - end + def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: 39393) + @log_capturer = log_capturer + @logger = log_capturer.logger @main_path = main_path @config = config @@ -337,7 +334,7 @@ def start_server_watchdog while @subprocess_thread.alive? do if (now - @last_message_time) > @message_timeout @server.shutdown - @logger&.print_output + @log_capturer.print_output raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" end diff --git a/test/test_helper.rb b/test/test_helper.rb index 626bbcad..d66c4194 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -49,7 +49,7 @@ class GhostferryTestCase < Minitest::Test def new_ghostferry(filepath, config: {}) # Transform path to something ruby understands path = File.join(GO_CODE_PATH, filepath, "main.go") - g = Ghostferry.new(path, config: config, logger: @log_capturer.logger) + g = Ghostferry.new(path, config: config, log_capturer: @log_capturer) @ghostferry_instances << g g end From 8159121f5310bfa68cc337de4132b5d95ade6dde Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Tue, 10 Oct 2023 16:26:35 +0200 Subject: [PATCH 07/12] Bump MaxClients in WEBrick --- test/helpers/ghostferry_helper.rb | 1 + test/integration/interrupt_resume_test.rb | 10 +--------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index af70b074..236a19fd 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -158,6 +158,7 @@ def start_server BindAddress: "127.0.0.1", Port: @server_port, Logger: @logger, + MaxClients: 1024, AccessLog: [], ) diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 4af99294..1cbf8c96 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -50,18 +50,10 @@ def test_interrupt_and_resume_without_last_known_schema_cache def test_interrupt_resume_with_writes_to_source # Start a ghostferry run expecting it to be interrupted. datawriter = new_source_datawriter - ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) start_datawriter_with_ghostferry(datawriter, ghostferry) - batches_written = 0 - ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do - batches_written += 1 - if batches_written >= 2 - ghostferry.send_signal("TERM") - end - end - dumped_state = ghostferry.run_expecting_interrupt assert_basic_fields_exist_in_dumped_state(dumped_state) From 122b61c275059ebef6d78975794d8ecaecfb247b Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Tue, 10 Oct 2023 17:14:51 +0200 Subject: [PATCH 08/12] Moar debugging --- test/helpers/ghostferry_helper.rb | 43 +++++++++++++++-------- test/integration/interrupt_resume_test.rb | 9 +++++ test/test_helper.rb | 15 ++++++-- 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 236a19fd..523459ee 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -6,6 +6,7 @@ require "tmpdir" require "webrick" require "cgi" +require "securerandom" module GhostferryHelper GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") @@ -47,11 +48,12 @@ module Status AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY" end - attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines + attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines, :tag def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: 39393) @log_capturer = log_capturer @logger = log_capturer.logger + @tag = SecureRandom.hex[0..3] @main_path = main_path @config = config @@ -94,6 +96,7 @@ def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: # The main method to call to run a Ghostferry subprocess. def run(resuming_state = nil) + @logger.info("[#{@tag}] ghostferry#run(state:#{(!resuming_state.nil?).inspect})") resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) compile_binary @@ -112,21 +115,26 @@ def run(resuming_state = nil) # When using this method, you need to ensure that the datawriter has been # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_interrupt(resuming_state = nil) + @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) rescue GhostferryExitFailure + @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got GhostferryExitFailure") dumped_state = @stdout.join("") JSON.parse(dumped_state) else + @logger.error("[#{@tag}] ghostferry#run_expecting_interrupt: something's wrong") raise "Ghostferry did not get interrupted" end # Same as above - ensure that the datawriter has been # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_failure(resuming_state = nil) + @logger.info("[#{@tag}] ghostferry#run_expecting_failure(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) rescue GhostferryExitFailure + @logger.info("[#{@tag}] ghostferry#run_expecting_failure: got GhostferryExitFailure") else - raise "Ghostferry did not fail" + raise "[#{@tag}] Ghostferry did not fail" end def run_with_logs(resuming_state = nil) @@ -140,14 +148,14 @@ def run_with_logs(resuming_state = nil) def compile_binary return if File.exist?(@compiled_binary_path) - @logger.debug("compiling test binary to #{@compiled_binary_path}") + @logger.debug("[#{@tag}] compiling test binary to #{@compiled_binary_path}") rc = system( "go", "build", "-o", @compiled_binary_path, @main_path ) - raise "could not compile ghostferry" unless rc + raise "[#{@tag}] could not compile ghostferry" unless rc end def start_server @@ -179,7 +187,7 @@ def start_server resp.status = 400 @server.shutdown elsif statuses.size > 1 - @logger.warn("Got multiple statuses at once: #{statuses.inspect}") + @logger.warn("[#{@tag}] Got multiple statuses at once: #{statuses.inspect}") puts "Got multiple statuses at once: #{statuses.inspect}" end @@ -187,6 +195,7 @@ def start_server data = query["data"] + @logger.info("[#{@tag}] server: got / with #{statuses.inspect}") statuses.each do |status| next if @status_handlers[status].nil? @@ -202,6 +211,7 @@ def start_server @server.mount_proc "/callbacks/progress" do |req, resp| begin + @logger.info("[#{@tag}] server: got /callbacks/progress") unless req.body @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data") resp.status = 400 @@ -216,6 +226,7 @@ def start_server @server.mount_proc "/callbacks/state" do |req, resp| begin + @logger.info("[#{@tag}] server: got /callbacks/state") unless req.body @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data") resp.status = 400 @@ -228,14 +239,15 @@ def start_server end @server.mount_proc "/callbacks/error" do |req, resp| + @logger.info("[#{@tag}] server: got /callbacks/error") @error = JSON.parse(JSON.parse(req.body)["Payload"]) @callback_handlers["error"].each { |f| f.call(@error) } unless @callback_handlers["error"].nil? end @server_thread = Thread.new do - @logger.debug("starting server thread") + @logger.debug("[#{@tag}] starting server thread") @server.start - @logger.debug("server thread stopped") + @logger.debug("[#{@tag}] server thread stopped") end end @@ -275,7 +287,7 @@ def start_ghostferry(resuming_state = nil) environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia] end - @logger.debug("starting ghostferry test binary #{@compiled_binary_path}") + @logger.debug("[#{@tag}] starting ghostferry test binary #{@compiled_binary_path}") Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| stdin.puts(resuming_state) unless resuming_state.nil? stdin.close @@ -297,7 +309,7 @@ def start_ghostferry(resuming_state = nil) if reader == stdout @stdout << line - @logger.debug("stdout: #{line}") + @logger.debug("[#{@tag}] stdout: #{line}") elsif reader == stderr @stderr << line if json_log_line?(line) @@ -310,8 +322,11 @@ def start_ghostferry(resuming_state = nil) if logline["level"] == "error" @error_lines << logline end + + @logger.debug("[#{@tag}] stderr: #{line}") unless tag.end_with?("_binlog_streamer") + else + @logger.debug("[#{@tag}] stderr: #{line}") end - @logger.debug("stderr: #{line}") end end end @@ -320,9 +335,9 @@ def start_ghostferry(resuming_state = nil) @pid = 0 end - @logger.debug("ghostferry test binary exitted: #{@exit_status}") + @logger.debug("[#{@tag}] ghostferry test binary exitted: #{@exit_status}") if @exit_status.exitstatus != 0 - raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}" + raise GhostferryExitFailure, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}" end end end @@ -336,14 +351,14 @@ def start_server_watchdog if (now - @last_message_time) > @message_timeout @server.shutdown @log_capturer.print_output - raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" + raise "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s" end sleep 1 end @server.shutdown - @logger.debug("server watchdog thread stopped") + @logger.debug("[#{@tag}] server watchdog thread stopped") end @server_watchdog_thread.abort_on_exception = true diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 1cbf8c96..b6baccc1 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -11,6 +11,7 @@ def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_in # Writes one batch ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + info("test[09]: on_status, received -> TERM") ghostferry.send_signal("TERM") end @@ -32,6 +33,7 @@ def test_interrupt_and_resume_without_last_known_schema_cache # Writes one batch ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + info("test[31]: on_status, received -> TERM") ghostferry.send_signal("TERM") end @@ -448,19 +450,26 @@ def test_interrupt_resume_idempotence_with_multiple_interrupts end def test_interrupt_resume_idempotence_with_multiple_interrupts_and_writes_to_source + @debug_me = true + info("test[452] start\n\n") ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) datawriter = new_source_datawriter start_datawriter_with_ghostferry(datawriter, ghostferry) + info("test[461] ghostferry#run_expecting_interrupt, no state\n\n") dumped_state = ghostferry.run_expecting_interrupt assert_basic_fields_exist_in_dumped_state(dumped_state) ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) + + info("test[466] ghostferry#run_expecting_interrupt, with state\n\n") ghostferry.run_expecting_interrupt(dumped_state) ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) stop_datawriter_during_cutover(datawriter, ghostferry) + + info("test[472] ghostferry#run_with_logs, with state\n\n") ghostferry.run_with_logs(dumped_state) assert_test_table_is_identical diff --git a/test/test_helper.rb b/test/test_helper.rb index d66c4194..6ca72ba1 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -11,7 +11,7 @@ require "data_writer_helper" class LogCapturer - attr_reader :logger + attr_reader :logger, :logger_device def initialize(level: Logger::DEBUG) @capture = ENV["DEBUG"] != "1" @@ -50,6 +50,7 @@ def new_ghostferry(filepath, config: {}) # Transform path to something ruby understands path = File.join(GO_CODE_PATH, filepath, "main.go") g = Ghostferry.new(path, config: config, log_capturer: @log_capturer) + info("[#{g.tag}] new_ghostferry: create") @ghostferry_instances << g g end @@ -57,11 +58,16 @@ def new_ghostferry(filepath, config: {}) def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0) g = new_ghostferry(filepath, config: config) + info("[#{g.tag}] new_ghostferry_wiarc: register status hook") batches_written = 0 g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do batches_written += 1 + if batches_written >= after_batches_written + info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> true") g.send_signal("TERM") + else + info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> false") end end @@ -83,6 +89,10 @@ def setup_signal_watcher Signal.trap("TERM") { self.on_term } end + def info(msg) + @log_capturer.logger.info(msg) + end + ############## # Test Hooks # ############## @@ -105,6 +115,7 @@ def before_setup # Same thing with DataWriter as above @datawriter_instances = [] + @debug_me = nil end def after_teardown @@ -116,7 +127,7 @@ def after_teardown datawriter.stop_and_join end - @log_capturer.print_output if self.failure + @log_capturer.print_output if self.failure || @debug_me @log_capturer.reset super end From 24b20c1c38058fd5c39da3a9b249b9b030d5de16 Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Thu, 12 Oct 2023 15:01:50 +0200 Subject: [PATCH 09/12] moar debugging --- ferry.go | 1 + test/lib/go/integrationferry/ferry.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/ferry.go b/ferry.go index c933443e..1a327713 100644 --- a/ferry.go +++ b/ferry.go @@ -880,6 +880,7 @@ func (f *Ferry) WaitUntilBinlogStreamerCatchesUp() { // You will know that the BinlogStreamer finished when .Run() returns. func (f *Ferry) FlushBinlogAndStopStreaming() { if f.WaitUntilReplicaIsCaughtUpToMaster != nil { + f.logger.Info("flush binlog and stop streaming: wait until replica is caught up to master") isReplica, err := CheckDbIsAReplica(f.WaitUntilReplicaIsCaughtUpToMaster.MasterDB) if err != nil { f.ErrorHandler.Fatal("wait_replica", err) diff --git a/test/lib/go/integrationferry/ferry.go b/test/lib/go/integrationferry/ferry.go index f1b4ce89..155f1f23 100644 --- a/test/lib/go/integrationferry/ferry.go +++ b/test/lib/go/integrationferry/ferry.go @@ -148,11 +148,14 @@ func (f *IntegrationFerry) Main() error { return err } + f.logger.Debug("flush binlog and stop streaming") // TODO: this method should return errors rather than calling // the error handler to panic directly. f.FlushBinlogAndStopStreaming() + f.logger.Debug("wait for Ferry to exit") wg.Wait() + f.logger.Debug("finished waiting") if f.Verifier != nil { err := f.SendStatusAndWaitUntilContinue(StatusVerifyDuringCutover) if err != nil { From 3c4116ff7a0af5bcf07c422b40577fea4ed63c39 Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Tue, 17 Oct 2023 17:14:16 +0200 Subject: [PATCH 10/12] Add retry for resume tests --- Gemfile | 1 + Gemfile.lock | 3 +++ test/helpers/ghostferry_helper.rb | 20 +++++++++++--------- test/lib/go/integrationferry/ferry.go | 3 --- test/main.rb | 2 ++ 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/Gemfile b/Gemfile index e404666d..b3cdbf46 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,7 @@ group :test do gem "minitest-hooks" gem "minitest-reporters", "~> 1.4" + gem "minitest-retry" gem "minitest-fail-fast", "~> 0.1.0" end diff --git a/Gemfile.lock b/Gemfile.lock index 42ce55a5..c83f95e1 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -16,6 +16,8 @@ GEM builder minitest (>= 5.0) ruby-progressbar + minitest-retry (0.2.2) + minitest (>= 5.0) mysql2 (0.5.5) pry (0.14.2) coderay (~> 1.1) @@ -35,6 +37,7 @@ DEPENDENCIES minitest-fail-fast (~> 0.1.0) minitest-hooks minitest-reporters (~> 1.4) + minitest-retry mysql2 pry-byebug tqdm diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 523459ee..4e3a9f15 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -15,8 +15,6 @@ def self.remove_all_binaries FileUtils.remove_entry(GHOSTFERRY_TEMPDIR) if Dir.exist?(GHOSTFERRY_TEMPDIR) end - class GhostferryExitFailure < StandardError - end class Ghostferry # Manages compiling, running, and communicating with Ghostferry. @@ -33,6 +31,10 @@ class Ghostferry # Keep these in sync with integrationferry.go ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT" + Error = Class.new(StandardError) + ExitError = Class.new(Error) + TimeoutError = Class.new(Error) + module Status # This should be in sync with integrationferry.go READY = "READY" @@ -117,8 +119,8 @@ def run(resuming_state = nil) def run_expecting_interrupt(resuming_state = nil) @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) - rescue GhostferryExitFailure - @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got GhostferryExitFailure") + rescue ExitError + @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got Ghostferry::ExitError") dumped_state = @stdout.join("") JSON.parse(dumped_state) else @@ -131,8 +133,8 @@ def run_expecting_interrupt(resuming_state = nil) def run_expecting_failure(resuming_state = nil) @logger.info("[#{@tag}] ghostferry#run_expecting_failure(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) - rescue GhostferryExitFailure - @logger.info("[#{@tag}] ghostferry#run_expecting_failure: got GhostferryExitFailure") + rescue ExitError + @logger.info("[#{@tag}] ghostferry#run_expecting_failure: got Ghostferry::ExitError") else raise "[#{@tag}] Ghostferry did not fail" end @@ -337,7 +339,7 @@ def start_ghostferry(resuming_state = nil) @logger.debug("[#{@tag}] ghostferry test binary exitted: #{@exit_status}") if @exit_status.exitstatus != 0 - raise GhostferryExitFailure, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}" + raise ExitError, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}" end end end @@ -351,7 +353,7 @@ def start_server_watchdog if (now - @last_message_time) > @message_timeout @server.shutdown @log_capturer.print_output - raise "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s" + raise TimeoutError, "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s" end sleep 1 @@ -408,7 +410,7 @@ def kill begin @subprocess_thread.join if @subprocess_thread - rescue GhostferryExitFailure + rescue ExitError # ignore end end diff --git a/test/lib/go/integrationferry/ferry.go b/test/lib/go/integrationferry/ferry.go index 155f1f23..f1b4ce89 100644 --- a/test/lib/go/integrationferry/ferry.go +++ b/test/lib/go/integrationferry/ferry.go @@ -148,14 +148,11 @@ func (f *IntegrationFerry) Main() error { return err } - f.logger.Debug("flush binlog and stop streaming") // TODO: this method should return errors rather than calling // the error handler to panic directly. f.FlushBinlogAndStopStreaming() - f.logger.Debug("wait for Ferry to exit") wg.Wait() - f.logger.Debug("finished waiting") if f.Verifier != nil { err := f.SendStatusAndWaitUntilContinue(StatusVerifyDuringCutover) if err != nil { diff --git a/test/main.rb b/test/main.rb index 7c28ab28..eaad9d0c 100644 --- a/test/main.rb +++ b/test/main.rb @@ -15,10 +15,12 @@ require "minitest" require "minitest/reporters" +require "minitest/retry" require "minitest/fail_fast" require "minitest/hooks/test" Minitest::Reporters.use! Minitest::Reporters::SpecReporter.new +Minitest::Retry.use!(exceptions_to_retry: [GhostferryHelper::Ghostferry::TimeoutError]) test_files.each do |f| require f From d3c1d39e80017eaf3c88e6e42a538cecfedebc2e Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Tue, 17 Oct 2023 17:45:12 +0200 Subject: [PATCH 11/12] Cleanup code from debugging --- ferry.go | 1 - test/helpers/ghostferry_helper.rb | 64 +++++++---------------- test/integration/interrupt_resume_test.rb | 7 --- test/test_helper.rb | 16 ++---- 4 files changed, 22 insertions(+), 66 deletions(-) diff --git a/ferry.go b/ferry.go index 1a327713..c933443e 100644 --- a/ferry.go +++ b/ferry.go @@ -880,7 +880,6 @@ func (f *Ferry) WaitUntilBinlogStreamerCatchesUp() { // You will know that the BinlogStreamer finished when .Run() returns. func (f *Ferry) FlushBinlogAndStopStreaming() { if f.WaitUntilReplicaIsCaughtUpToMaster != nil { - f.logger.Info("flush binlog and stop streaming: wait until replica is caught up to master") isReplica, err := CheckDbIsAReplica(f.WaitUntilReplicaIsCaughtUpToMaster.MasterDB) if err != nil { f.ErrorHandler.Fatal("wait_replica", err) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 4e3a9f15..27d1424c 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -6,7 +6,6 @@ require "tmpdir" require "webrick" require "cgi" -require "securerandom" module GhostferryHelper GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") @@ -50,12 +49,10 @@ module Status AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY" end - attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines, :tag + attr_reader :stdout, :stderr, :logrus_lines, :exit_status, :pid, :error, :error_lines - def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: 39393) - @log_capturer = log_capturer - @logger = log_capturer.logger - @tag = SecureRandom.hex[0..3] + def initialize(main_path, config: {}, logger:, message_timeout: 30, port: 39393) + @logger = logger @main_path = main_path @config = config @@ -98,7 +95,6 @@ def initialize(main_path, config: {}, log_capturer:, message_timeout: 30, port: # The main method to call to run a Ghostferry subprocess. def run(resuming_state = nil) - @logger.info("[#{@tag}] ghostferry#run(state:#{(!resuming_state.nil?).inspect})") resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) compile_binary @@ -117,26 +113,21 @@ def run(resuming_state = nil) # When using this method, you need to ensure that the datawriter has been # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_interrupt(resuming_state = nil) - @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) rescue ExitError - @logger.info("[#{@tag}] ghostferry#run_expecting_interrupt: got Ghostferry::ExitError") dumped_state = @stdout.join("") JSON.parse(dumped_state) else - @logger.error("[#{@tag}] ghostferry#run_expecting_interrupt: something's wrong") raise "Ghostferry did not get interrupted" end # Same as above - ensure that the datawriter has been # stopped properly (if you're using stop_datawriter_during_cutover). def run_expecting_failure(resuming_state = nil) - @logger.info("[#{@tag}] ghostferry#run_expecting_failure(state:#{(!resuming_state.nil?).inspect})") run(resuming_state) rescue ExitError - @logger.info("[#{@tag}] ghostferry#run_expecting_failure: got Ghostferry::ExitError") else - raise "[#{@tag}] Ghostferry did not fail" + raise "Ghostferry did not fail" end def run_with_logs(resuming_state = nil) @@ -150,14 +141,14 @@ def run_with_logs(resuming_state = nil) def compile_binary return if File.exist?(@compiled_binary_path) - @logger.debug("[#{@tag}] compiling test binary to #{@compiled_binary_path}") + @logger.debug("compiling test binary to #{@compiled_binary_path}") rc = system( "go", "build", "-o", @compiled_binary_path, @main_path ) - raise "[#{@tag}] could not compile ghostferry" unless rc + raise "could not compile ghostferry" unless rc end def start_server @@ -182,27 +173,17 @@ def start_server query = CGI::parse(req.body) - statuses = Array(query["status"]) + status = Array(query["status"]).first + data = query["data"] - if statuses.empty? + if status.nil? @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status") resp.status = 400 @server.shutdown - elsif statuses.size > 1 - @logger.warn("[#{@tag}] Got multiple statuses at once: #{statuses.inspect}") - puts "Got multiple statuses at once: #{statuses.inspect}" end @last_message_time = now - - data = query["data"] - - @logger.info("[#{@tag}] server: got / with #{statuses.inspect}") - statuses.each do |status| - next if @status_handlers[status].nil? - - @status_handlers[status].each { |f| f.call(*data) } - end + @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? rescue StandardError => e # errors are not reported from WEBrick but the server should fail early # as this indicates there is likely a programming error. @@ -213,7 +194,6 @@ def start_server @server.mount_proc "/callbacks/progress" do |req, resp| begin - @logger.info("[#{@tag}] server: got /callbacks/progress") unless req.body @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data") resp.status = 400 @@ -228,7 +208,6 @@ def start_server @server.mount_proc "/callbacks/state" do |req, resp| begin - @logger.info("[#{@tag}] server: got /callbacks/state") unless req.body @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send data") resp.status = 400 @@ -241,15 +220,14 @@ def start_server end @server.mount_proc "/callbacks/error" do |req, resp| - @logger.info("[#{@tag}] server: got /callbacks/error") @error = JSON.parse(JSON.parse(req.body)["Payload"]) @callback_handlers["error"].each { |f| f.call(@error) } unless @callback_handlers["error"].nil? end @server_thread = Thread.new do - @logger.debug("[#{@tag}] starting server thread") + @logger.debug("starting server thread") @server.start - @logger.debug("[#{@tag}] server thread stopped") + @logger.debug("server thread stopped") end end @@ -289,7 +267,7 @@ def start_ghostferry(resuming_state = nil) environment["GHOSTFERRY_MARGINALIA"] = @config[:marginalia] end - @logger.debug("[#{@tag}] starting ghostferry test binary #{@compiled_binary_path}") + @logger.debug("starting ghostferry test binary #{@compiled_binary_path}") Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| stdin.puts(resuming_state) unless resuming_state.nil? stdin.close @@ -311,7 +289,7 @@ def start_ghostferry(resuming_state = nil) if reader == stdout @stdout << line - @logger.debug("[#{@tag}] stdout: #{line}") + @logger.debug("stdout: #{line}") elsif reader == stderr @stderr << line if json_log_line?(line) @@ -324,11 +302,8 @@ def start_ghostferry(resuming_state = nil) if logline["level"] == "error" @error_lines << logline end - - @logger.debug("[#{@tag}] stderr: #{line}") unless tag.end_with?("_binlog_streamer") - else - @logger.debug("[#{@tag}] stderr: #{line}") end + @logger.debug("stderr: #{line}") end end end @@ -337,9 +312,9 @@ def start_ghostferry(resuming_state = nil) @pid = 0 end - @logger.debug("[#{@tag}] ghostferry test binary exitted: #{@exit_status}") + @logger.debug("ghostferry test binary exitted: #{@exit_status}") if @exit_status.exitstatus != 0 - raise ExitError, "[#{@tag}] ghostferry test binary returned non-zero status: #{@exit_status}" + raise ExitError, "ghostferry test binary returned non-zero status: #{@exit_status}" end end end @@ -352,15 +327,14 @@ def start_server_watchdog while @subprocess_thread.alive? do if (now - @last_message_time) > @message_timeout @server.shutdown - @log_capturer.print_output - raise TimeoutError, "[#{@tag}] ghostferry did not report to the integration test server for the last #{@message_timeout}s" + raise TimeoutError, "ghostferry did not report to the integration test server for the last #{@message_timeout}s" end sleep 1 end @server.shutdown - @logger.debug("[#{@tag}] server watchdog thread stopped") + @logger.debug("server watchdog thread stopped") end @server_watchdog_thread.abort_on_exception = true diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index b6baccc1..0da17264 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -11,7 +11,6 @@ def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_in # Writes one batch ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do - info("test[09]: on_status, received -> TERM") ghostferry.send_signal("TERM") end @@ -33,7 +32,6 @@ def test_interrupt_and_resume_without_last_known_schema_cache # Writes one batch ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do - info("test[31]: on_status, received -> TERM") ghostferry.send_signal("TERM") end @@ -450,26 +448,21 @@ def test_interrupt_resume_idempotence_with_multiple_interrupts end def test_interrupt_resume_idempotence_with_multiple_interrupts_and_writes_to_source - @debug_me = true - info("test[452] start\n\n") ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) datawriter = new_source_datawriter start_datawriter_with_ghostferry(datawriter, ghostferry) - info("test[461] ghostferry#run_expecting_interrupt, no state\n\n") dumped_state = ghostferry.run_expecting_interrupt assert_basic_fields_exist_in_dumped_state(dumped_state) ghostferry = new_ghostferry_with_interrupt_after_row_copy(MINIMAL_GHOSTFERRY, after_batches_written: 2) - info("test[466] ghostferry#run_expecting_interrupt, with state\n\n") ghostferry.run_expecting_interrupt(dumped_state) ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) stop_datawriter_during_cutover(datawriter, ghostferry) - info("test[472] ghostferry#run_with_logs, with state\n\n") ghostferry.run_with_logs(dumped_state) assert_test_table_is_identical diff --git a/test/test_helper.rb b/test/test_helper.rb index 6ca72ba1..9b9f8b4f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -11,7 +11,7 @@ require "data_writer_helper" class LogCapturer - attr_reader :logger, :logger_device + attr_reader :logger def initialize(level: Logger::DEBUG) @capture = ENV["DEBUG"] != "1" @@ -49,8 +49,7 @@ class GhostferryTestCase < Minitest::Test def new_ghostferry(filepath, config: {}) # Transform path to something ruby understands path = File.join(GO_CODE_PATH, filepath, "main.go") - g = Ghostferry.new(path, config: config, log_capturer: @log_capturer) - info("[#{g.tag}] new_ghostferry: create") + g = Ghostferry.new(path, config: config, logger: @log_capturer.logger) @ghostferry_instances << g g end @@ -58,16 +57,12 @@ def new_ghostferry(filepath, config: {}) def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0) g = new_ghostferry(filepath, config: config) - info("[#{g.tag}] new_ghostferry_wiarc: register status hook") batches_written = 0 g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do batches_written += 1 if batches_written >= after_batches_written - info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> true") g.send_signal("TERM") - else - info("[#{g.tag}] new_ghostferry_wiarc: on_status #{batches_written} >= #{after_batches_written} -> false") end end @@ -89,10 +84,6 @@ def setup_signal_watcher Signal.trap("TERM") { self.on_term } end - def info(msg) - @log_capturer.logger.info(msg) - end - ############## # Test Hooks # ############## @@ -115,7 +106,6 @@ def before_setup # Same thing with DataWriter as above @datawriter_instances = [] - @debug_me = nil end def after_teardown @@ -127,7 +117,7 @@ def after_teardown datawriter.stop_and_join end - @log_capturer.print_output if self.failure || @debug_me + @log_capturer.print_output if self.failure @log_capturer.reset super end From af1981ce448bcc628782bafc08ef1252052b85a7 Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Wed, 18 Oct 2023 13:22:24 +0200 Subject: [PATCH 12/12] Update commante on usage --- test/helpers/ghostferry_helper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 27d1424c..cfd473dd 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -21,7 +21,7 @@ class Ghostferry # # To use this class: # - # ghostferry = Ghostferry.new("path/to/main.go") + # ghostferry = Ghostferry.new("path/to/main.go", logger: Logger.new(STDOUT)) # ghostferry.on_status(Ghostferry::Status::BEFORE_ROW_COPY) do # # do custom work here, such as injecting data into the database # end