Skip to content

Commit

Permalink
Merge pull request #4340 from sanger/4133-y24-120-bug-taken-link-erro…
Browse files Browse the repository at this point in the history
…r-when-creating-sequencing-batches

Y24-120 Part-2 Bug taken link error when creating sequencing batches
  • Loading branch information
yoldas authored Oct 3, 2024
2 parents 8fee99a + a9ff092 commit 4428fee
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 51 deletions.
77 changes: 77 additions & 0 deletions app/models/asset_link.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,81 @@ def has_#{name}?
end
end
end

# Creates an edge between the ancestor and descendant nodes using save.
#
# This method first attempts to find an existing link between the ancestor
# and descendant. If no link is found, it builds a new edge and saves it.
# If a link is found, it makes the link an edge and saves it.
#
# This method is overridden to handle race conditions in finding an
# existing link and has_duplicates validation. It also assumes that there
# is a unique-together index on ancestor_id and descendant_id columns.
#
# @param ancestor [Dag::Standard::EndPoint] The ancestor node.
# @param descendant [Dag::Standard::EndPoint] The descendant node.
# @return [Boolean] Returns true if the edge is successfully created or
# already exists, false otherwise.
# @raise [ActiveRecord::RecordNotUnique] Re-raises any exception if it is
# not a constraint violation that involves ancestor_id and descendant_id
# columns.
def self.create_edge(ancestor, descendant)
# Two processes try to find an existing link.
link = find_link(ancestor, descendant)
# Either or both may find no link and try to create a new edge.
if link.nil?
edge = build_edge(ancestor, descendant)
result = save_edge_or_handle_error(edge)
return result unless result.nil? # Bubble up.
# Losing process finds the edge created by the winning process.
link = find_link(ancestor, descendant)
end

return if link.nil?

link.make_direct
link.changed? ? link.save : true
end

# Saves the edge between the ancestor and descendant nodes or handles errors.
#
# @param edge [AssetLink] The edge object containing the errors.
# @return [Boolean] Returns true if the edge is successfully saved,
# nil if the error is unique validation or constraint violation,
# false if the error is another validation error.
# @raise [ActiveRecord::RecordNotUnique] Re-raises an exception if the
# exception caught is not a unique constraint violation.
def self.save_edge_or_handle_error(edge)
# Winning process successfully saves the edge (direct link).
return true if edge.save
# has_duplicate validation may see it for the losing process before
# hitting the DB.
return false unless unique_validation_error?(edge) # Bubble up.
edge.errors.clear # Clear all errors and use the existing link.
rescue ActiveRecord::RecordNotUnique => e
# Unique constraint violation is triggered for the losing process after
# hitting the DB.
raise unless unique_violation_error?(edge, e) # Bubble up.
end

# Checks if the validation error includes a specific message indicating a
# unique link already exists.
#
# @param edge [AssetLink] The edge object containing the errors.
# @return [Boolean] Returns true if the errors include the message "Link
# already exists between these points", false otherwise.
def self.unique_validation_error?(edge)
edge.errors[:base].include?('Link already exists between these points')
end

# Checks if the unique constraint violation involves the specified columns.
#
# @param edge [AssetLink] The edge object containing the column names.
# @param exception [ActiveRecord::RecordNotUnique] The exception raised due
# to the unique constraint violation.
# @return [Boolean] Returns true if the exception message includes both the
# ancestor and descendant column names, false otherwise.
def self.unique_violation_error?(edge, exception)
[edge.ancestor_id_column_name, edge.descendant_id_column_name].all? { |col| exception.message.include?(col) }
end
end
26 changes: 26 additions & 0 deletions db/migrate/20240912000109_add_unique_index_to_asset_links.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

# This migration adds a unique-together index on ancestor_id and descendant_id
# in order to prevent duplicate links between the same ancestor and descendant
# labware.
#
# Before this migration, the database allowed duplicate asset_links between the
# same ancestor and descendant labware. Therefore, the migration will fail if
# there are any duplicate links in the database. To fix this, they must be
# removed before running the migration using the rake task:
#
# bundle exec rake 'support:remove_duplicate_asset_links[csv_file_path]'
#
# The rake task will write the removed records into a CSV file that can be used
# for auditing purposes.
#
# Note that the column names in the index name below is used for finding the
# reason of the database unique constraint violation by the AssetLink model.
class AddUniqueIndexToAssetLinks < ActiveRecord::Migration[6.1]
def change
add_index :asset_links,
%i[ancestor_id descendant_id],
unique: true,
name: 'index_asset_links_on_ancestor_id_and_descendant_id'
end
end
1 change: 1 addition & 0 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
t.integer "count"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["ancestor_id", "descendant_id"], name: "index_asset_links_on_ancestor_id_and_descendant_id", unique: true
t.index ["ancestor_id", "direct"], name: "index_asset_links_on_ancestor_id_and_direct"
t.index ["descendant_id", "direct"], name: "index_asset_links_on_descendant_id_and_direct"
end
Expand Down
280 changes: 280 additions & 0 deletions spec/models/asset_links_create_edge_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe AssetLink, type: :model do
# rubocop:disable RSpec/InstanceVariable,Metrics/MethodLength,RSpec/ExampleLength,RSpec/MultipleExpectations
# Test the overridden create_edge class method.
describe '.create_edge' do
# Wait for child processes to finish.
#
# @param pids [Array<Integer>] Process IDs
# @raise [Timeout::Error] If a child process does not finish within 10 seconds.
# @return [void]
def wait_for_child_processes(pids)
exits = []
pids
.each
.with_index(1) do |pid, index|
Timeout.timeout(10) do
_pid, status = Process.wait2(pid)
exits << status.exitstatus
end
rescue Timeout::Error
raise StandardError, "parent: Timeout waiting for child#{index} to finish"
end

expect(exits).to eq([0, 0])
end

# Delete created records.
after do
@ancestor&.destroy
@descendant&.destroy
@edge&.destroy
end

it 'handles race condition at find_link' do
# In this example, the first and second processes are forked from the
# main process and they are in race condition to find an existing link
# between the ancestor and descendant and create an edge if no link is
# found. Both first and second processes finds no link, but the second
# process creates the edge first. The first also tries to create the
# edge based on the result of the find_link method. Only one link must
# be created.

# Parent
ActiveRecord::Base.connection.reconnect!
@ancestor = ancestor = create(:labware)
@descendant = descendant = create(:labware)
ActiveRecord::Base.connection.commit_db_transaction

# Create a duplex pipe for IPC.
first_socket, second_socket = UNIXSocket.pair

# First child
pid1 =
fork do
ActiveRecord::Base.connection.reconnect!

find_link_call_count = 0
# Patch find_link method
allow(described_class).to receive(:find_link).and_wrap_original do |m, *args|
find_link_call_count += 1
link = m.call(*args)
if find_link_call_count == 1
expect(link).to be_nil # No link found
message = 'paused'
first_socket.send(message, 0) # Notify the second child
message = 'child1: Timeout waiting for resume message'
raise StandardError, message unless first_socket.wait_readable(10)
message = 'resume' # Wait for the second child to create the edge
first_socket.recv(message.size)
elsif find_link_call_count == 2
expect(link).not_to be_nil # Found one now
end
link
end
described_class.create_edge(ancestor, descendant)

ActiveRecord::Base.connection.close
end

# Second child
pid2 =
fork do
ActiveRecord::Base.connection.reconnect!
message = 'child2: Timeout waiting for paused message'
raise StandardError, message unless second_socket.wait_readable(10)
message = 'paused' # Wait for the first child to call find_link
second_socket.recv(message.size)
described_class.create_edge(ancestor, descendant)
message = 'resume' # Notify the first child
second_socket.send(message, 0)
ActiveRecord::Base.connection.close
end

# Parent
wait_for_child_processes([pid1, pid2])

expect(described_class.where(ancestor:, descendant:).count).to eq(1)
@edge = edge = described_class.last
expect(edge.ancestor).to eq(ancestor)
expect(edge.descendant).to eq(descendant)
expect(edge.direct).to be_truthy
expect(edge.count).to eq(1)

ActiveRecord::Base.connection.close
end

it 'handles unique validation error' do
# In this example, the first and second processes are forked from the
# main process. Neither of them finds and existing link between the
# ancestor and descendant. Both try to create the edge. One of them
# succeeds and the other one fails due to the has_duplicates validation.
# Failing process should use the existing link.

# Parent
ActiveRecord::Base.connection.reconnect!
@ancestor = ancestor = create(:labware)
@descendant = descendant = create(:labware)
ActiveRecord::Base.connection.commit_db_transaction

# Create a duplex pipe for IPC.
first_socket, second_socket = UNIXSocket.pair

# First child
pid1 =
fork do
ActiveRecord::Base.connection.reconnect!
find_link_call_count = 0
allow(described_class).to receive(:find_link).and_wrap_original do |m, *args|
find_link_call_count += 1
link = m.call(*args)
if find_link_call_count == 1
expect(link).to be_nil # No link found
message = 'paused' # Notify the second child
first_socket.send(message, 0)
message = 'child1: Timeout waiting for resume message'
raise StandardError, message unless first_socket.wait_readable(10)
message = 'resume' # Wait for the second child to create the edge
first_socket.recv(message.size)
end
link
end

# Patch unique_validation_error? method
unique_validation_error_return_value = nil
allow(described_class).to receive(:unique_validation_error?).and_wrap_original do |m, *args|
unique_validation_error_return_value = m.call(*args)
end

result = described_class.create_edge(ancestor, descendant)
expect(result).to be_truthy
expect(described_class).to have_received(:unique_validation_error?)
expect(unique_validation_error_return_value).to be_truthy

ActiveRecord::Base.connection.close
end

# Second child
pid2 =
fork do
ActiveRecord::Base.connection.reconnect!
message = 'child2: Timeout waiting for paused message'
raise StandardError, message unless second_socket.wait_readable(10)
message = 'paused' # Wait for the first child to call find_link
second_socket.recv(message.size)
described_class.create_edge(ancestor, descendant)
message = 'resume' # Notify the first child
second_socket.send(message, 0)
ActiveRecord::Base.connection.close
end

# Parent
wait_for_child_processes([pid1, pid2])

expect(described_class.where(ancestor:, descendant:).count).to eq(1)
@edge = edge = described_class.last
expect(edge.ancestor).to eq(ancestor)
expect(edge.descendant).to eq(descendant)
expect(edge.direct).to be_truthy
expect(edge.count).to eq(1)
end

it 'handles unique constraint violation' do
# In this example, the first and second processes are forked from the
# main process. Neither of them finds and existing link between the
# ancestor and descendant. Both try to create the edge. One of them
# succeeds and the other one fails due to the database unique constraint
# violation. Failing process should use the existing link.

# Parent
ActiveRecord::Base.connection.reconnect!
@ancestor = ancestor = create(:labware)
@descendant = descendant = create(:labware)
ActiveRecord::Base.connection.commit_db_transaction

# Create a duplex pipe for IPC.
first_socket, second_socket = UNIXSocket.pair

# First child
pid1 =
fork do
ActiveRecord::Base.connection.reconnect!
find_link_call_count = 0
allow(described_class).to receive(:find_link).and_wrap_original do |m, *args|
find_link_call_count += 1
link = m.call(*args)
if find_link_call_count == 1
expect(link).to be_nil
message = 'paused' # Notify the second child
first_socket.send(message, 0)
message = 'child1: Timeout waiting for resume message'
raise StandardError, message unless first_socket.wait_readable(10)
message = 'resume' # Wait for the second child to create the edge
first_socket.recv(message.size)
end
link
end

# Patch has_duplicates method.
# rubocop:disable RSpec/AnyInstance
allow_any_instance_of(Dag::CreateCorrectnessValidator).to receive(:has_duplicates).and_return(false)
# rubocop:enable RSpec/AnyInstance

# Patch save_edge_or_handle_error method.
allow(described_class).to receive(:save_edge_or_handle_error).and_wrap_original do |method, *args|
edge = args[0]
message =
"Duplicate entry '#{ancestor.id}-#{descendant.id}' " \
"for key 'index_asset_links_on_ancestor_id_and_descendant_id'"
exception = ActiveRecord::RecordNotUnique.new(message)
allow(edge).to receive(:save).and_raise(exception)

method.call(*args)
end

# Patch unique_violation_error? method.
unique_violation_error_return_value = nil
allow(described_class).to receive(:unique_violation_error?).and_wrap_original do |m, *args|
unique_violation_error_return_value = m.call(*args)
end

result = described_class.create_edge(ancestor, descendant)
expect(result).to be_truthy
expect(described_class).to have_received(:unique_violation_error?)
expect(unique_violation_error_return_value).to be_truthy

ActiveRecord::Base.connection.close
end

# Second child
pid2 =
fork do
ActiveRecord::Base.connection.reconnect!
message = 'child2: Timeout waiting for paused message'
raise StandardError, message unless second_socket.wait_readable(10)
message = 'paused' # Wait for the first child to call find_link
second_socket.recv(message.size)
described_class.create_edge(ancestor, descendant)
message = 'resume' # Notify the first child
second_socket.send(message, 0)
ActiveRecord::Base.connection.close
end

# Parent
wait_for_child_processes([pid1, pid2])

expect(described_class.where(ancestor:, descendant:).count).to eq(1)
@edge = edge = described_class.last
expect(edge.ancestor).to eq(ancestor)
expect(edge.descendant).to eq(descendant)
expect(edge.direct).to be_truthy
expect(edge.count).to eq(1)

ActiveRecord::Base.connection.close
end
end
# rubocop:enable RSpec/InstanceVariable,Metrics/MethodLength,RSpec/ExampleLength,RSpec/MultipleExpectations
end
Loading

0 comments on commit 4428fee

Please sign in to comment.