Skip to content

Commit

Permalink
GH-41149: [C++][Acero] Fix asof join race (#41614)
Browse files Browse the repository at this point in the history
### Rationale for this change

Sporadic asof join test failures have been frequently and annoyingly observed in pyarrow CI, as recorded in #40675 and #41149.

Turns out the root causes are the same - a logical race (as opposed to physical race which can be detected by sanitizers). By injecting special delay in various places in asof join, as shown in zanmato1984@ea3b24c, the issue can be reproduced almost 100%. And I have put some descriptions in that commit to explain how the race happens.

### What changes are included in this PR?

Eliminate the logical race of emptiness by combining multiple call-sites of `Empty()`.

### Are these changes tested?

Include the UT to reproduce the issue.

### Are there any user-facing changes?

None.

**This PR contains a "Critical Fix".**
In #40675 and #41149 , incorrect results are produced.
* GitHub Issue: #41149 
* Also closes #40675

Authored-by: Ruoxi Sun <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
zanmato1984 authored May 14, 2024
1 parent a4a5cf1 commit 8f27e26
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 29 deletions.
73 changes: 44 additions & 29 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,10 @@ class InputState {
// true when the queue is empty and, when memo may have future entries (the case of a
// positive tolerance), when the memo is empty.
// used when checking whether RHS is up to date with LHS.
bool CurrentEmpty() const {
return memo_.no_future_ ? Empty() : memo_.times_.empty() && Empty();
// NOTE: The emptiness must be decided by a single call to Empty() in caller, due to the
// potential race with Push(), see GH-41614.
bool CurrentEmpty(bool empty) const {
return memo_.no_future_ ? empty : (memo_.times_.empty() && empty);
}

// in case memo may not have future entries (the case of a non-positive tolerance),
Expand Down Expand Up @@ -650,13 +652,15 @@ class InputState {
// timestamp, update latest_time and latest_ref_row to the value that immediately pass
// the horizon. Update the memo-store with any entries or future entries so observed.
// Returns true if updates were made, false if not.
Result<bool> AdvanceAndMemoize(OnType ts) {
// NOTE: The emptiness must be decided by a single call to Empty() in caller, due to the
// potential race with Push(), see GH-41614.
Result<bool> AdvanceAndMemoize(OnType ts, bool empty) {
// Advance the right side row index until we reach the latest right row (for each key)
// for the given left timestamp.
DEBUG_SYNC(node_, "Advancing input ", index_, DEBUG_MANIP(std::endl));

// Check if already updated for TS (or if there is no latest)
if (Empty()) { // can't advance if empty and no future entries
if (empty) { // can't advance if empty and no future entries
return memo_.no_future_ ? false : memo_.RemoveEntriesWithLesserTime(ts);
}

Expand Down Expand Up @@ -918,34 +922,46 @@ class CompositeTableBuilder {
// guaranteeing this probability is below 1 in a billion. The fix is 128-bit hashing.
// See ARROW-17653
class AsofJoinNode : public ExecNode {
// Advances the RHS as far as possible to be up to date for the current LHS timestamp
Result<bool> UpdateRhs() {
// A simple wrapper for the result of a single call to UpdateRhs(), identifying:
// 1) If any RHS has advanced.
// 2) If all RHS are up to date with LHS.
struct RhsUpdateState {
bool any_advanced;
bool all_up_to_date_with_lhs;
};
// Advances the RHS as far as possible to be up to date for the current LHS timestamp,
// and checks if all RHS are up to date with LHS. The reason they have to be performed
// together is that they both depend on the emptiness of the RHS, which can be changed
// by Push() executing in another thread.
Result<RhsUpdateState> UpdateRhs() {
auto& lhs = *state_.at(0);
auto lhs_latest_time = lhs.GetLatestTime();
bool any_updated = false;
for (size_t i = 1; i < state_.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(bool advanced, state_[i]->AdvanceAndMemoize(lhs_latest_time));
any_updated |= advanced;
}
return any_updated;
}

// Returns false if RHS not up to date for LHS
bool IsUpToDateWithLhsRow() const {
auto& lhs = *state_[0];
if (lhs.Empty()) return false; // can't proceed if nothing on the LHS
OnType lhs_ts = lhs.GetLatestTime();
RhsUpdateState update_state{/*any_advanced=*/false, /*all_up_to_date_with_lhs=*/true};
for (size_t i = 1; i < state_.size(); ++i) {
auto& rhs = *state_[i];
if (!rhs.Finished()) {

// Obtain RHS emptiness once for subsequent AdvanceAndMemoize() and CurrentEmpty().
bool rhs_empty = rhs.Empty();
// Obtain RHS current time here because AdvanceAndMemoize() can change the
// emptiness.
OnType rhs_current_time = rhs_empty ? OnType{} : rhs.GetLatestTime();

ARROW_ASSIGN_OR_RAISE(bool advanced,
rhs.AdvanceAndMemoize(lhs_latest_time, rhs_empty));
update_state.any_advanced |= advanced;

if (update_state.all_up_to_date_with_lhs && !rhs.Finished()) {
// If RHS is finished, then we know it's up to date
if (rhs.CurrentEmpty())
return false; // RHS isn't finished, but is empty --> not up to date
if (lhs_ts > rhs.GetCurrentTime())
return false; // RHS isn't up to date (and not finished)
if (rhs.CurrentEmpty(rhs_empty)) {
// RHS isn't finished, but is empty --> not up to date
update_state.all_up_to_date_with_lhs = false;
} else if (lhs_latest_time > rhs_current_time) {
// RHS isn't up to date (and not finished)
update_state.all_up_to_date_with_lhs = false;
}
}
}
return true;
return update_state;
}

Result<std::shared_ptr<RecordBatch>> ProcessInner() {
Expand All @@ -963,20 +979,19 @@ class AsofJoinNode : public ExecNode {
// If LHS is finished or empty then there's nothing we can do here
if (lhs.Finished() || lhs.Empty()) break;

// Advance each of the RHS as far as possible to be up to date for the LHS timestamp
ARROW_ASSIGN_OR_RAISE(bool any_rhs_advanced, UpdateRhs());
ARROW_ASSIGN_OR_RAISE(auto rhs_update_state, UpdateRhs());

// If we have received enough inputs to produce the next output batch
// (decided by IsUpToDateWithLhsRow), we will perform the join and
// materialize the output batch. The join is done by advancing through
// the LHS and adding joined row to rows_ (done by Emplace). Finally,
// input batches that are no longer needed are removed to free up memory.
if (IsUpToDateWithLhsRow()) {
if (rhs_update_state.all_up_to_date_with_lhs) {
dst.Emplace(state_, tolerance_);
ARROW_ASSIGN_OR_RAISE(bool advanced, lhs.Advance());
if (!advanced) break; // if we can't advance LHS, we're done for this batch
} else {
if (!any_rhs_advanced) break; // need to wait for new data
if (!rhs_update_state.any_advanced) break; // need to wait for new data
}
}

Expand Down
54 changes: 54 additions & 0 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1678,5 +1678,59 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) {
/*slow_r0=*/false);
}

// Reproduction of GH-40675: A logical race between Process() and Push() that can be more
// easily observed with single small batch.
TEST(AsofJoinTest, RhsEmptinessRace) {
auto left_batch = ExecBatchFromJSON(
{int64(), utf8()}, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])");
auto right_batch = ExecBatchFromJSON(
{int64(), utf8(), float64()}, R"([[2, "a", 1.0], [9, "b", 3.0], [15, "g", 5.0]])");

Declaration left{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colA", int64()), field("col2", utf8())}),
{std::move(left_batch)})};
Declaration right{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colB", int64()), field("col3", utf8()),
field("colC", float64())}),
{std::move(right_batch)})};
AsofJoinNodeOptions asof_join_opts({{{"colA"}, {{"col2"}}}, {{"colB"}, {{"col3"}}}}, 1);
Declaration asof_join{
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};

ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));

auto exp_batch = ExecBatchFromJSON(
{int64(), utf8(), float64()},
R"([[1, "a", 1.0], [1, "b", null], [5, "a", null], [6, "b", null], [7, "f", null]])");
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches);
}

// Reproduction of GH-41149: Another case of the same root cause as GH-40675, but with
// empty "by" columns.
TEST(AsofJoinTest, RhsEmptinessRaceEmptyBy) {
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
auto right_batch =
ExecBatchFromJSON({utf8(), int64()}, R"([["Z", 2], ["B", 3], ["A", 4]])");

Declaration left{"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
{std::move(left_batch)})};
Declaration right{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colVals", utf8()), field("on", int64())}),
{std::move(right_batch)})};
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
Declaration asof_join{
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};

ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));

auto exp_batch =
ExecBatchFromJSON({int64(), utf8()}, R"([[1, "Z"], [2, "Z"], [3, "B"]])");
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches);
}

} // namespace acero
} // namespace arrow

0 comments on commit 8f27e26

Please sign in to comment.