Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider transaction rolled back after failure #5

Merged
merged 3 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Add support for Bolt 5.2, which adds notification filtering.
- Add `Driver::is_encrypted()`.
- Reduce the number of lifetime generic parameters in `TransactionQueryBuilder` and `TransactionRecordStream`.
- Fix `Transaction::rolblack()` failing if a result stream failed before.

## 0.0.2
- Update dependencies.
Expand Down
16 changes: 14 additions & 2 deletions neo4j/src/driver/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,26 @@ impl<'driver, 'tx> Transaction<'driver, 'tx> {
/// This is the default behavior when the transaction is dropped.
/// However, when dropping the transaction, potential errors will be swallowed.
pub fn rollback(self) -> Result<()> {
self.drop_result.into_inner()?;
self.inner_tx.rollback()
match self.drop_result.into_inner() {
Ok(_) => self.inner_tx.rollback(),
Err(_) => {
// Nothing to do here.
// The transaction already failed and doesn't need to be rolled back.
Ok(())
}
}
}
}

/// A result cursor as returned by [`TransactionQueryBuilder::run()`].
///
/// It implements [`Iterator`] and can be used to iterate over the [`Record`]s.
///
/// Before ending the transaction ([`Transaction::commit()`] or [`Transaction::rollback()`]), all
/// record streams spawned from it must be dropped.
/// While calling [`drop(stream)`] works fine for this purpose, it will swallow any outstanding
/// errors.
/// Therefore, it is recommended to use [`TransactionRecordStream::consume()`] instead.
#[derive(Debug)]
pub struct TransactionRecordStream<'driver, 'tx>(
RecordStream<'driver>,
Expand Down
9 changes: 3 additions & 6 deletions testkit_backend/src/testkit_backend/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,9 @@ static REGEX_SKIPPED_TESTS: OnceLock<Vec<(&'static Regex, &'static str)>> = Once

fn get_plain_skipped_tests() -> &'static HashMap<&'static str, &'static str> {
PLAIN_SKIPPED_TESTS.get_or_init(|| {
let mut map = HashMap::new();
map.insert(
"neo4j.test_tx_run.TestTxRun.test_tx_res_fails_whole_tx",
"backend (SessionHolder) doesn't keep result buffers around after error",
);
map
HashMap::from([
// ("path.to.skipped_test", "reason"),
])
})
}

Expand Down
34 changes: 22 additions & 12 deletions testkit_backend/src/testkit_backend/session_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,15 +470,10 @@ impl SessionHolderRunner {
}
Err(err) => {
known_transactions.insert(id, TxFailState::Failed);
// outside the receiver, we'll just reply with an
// error to the command. So query and parameters
// don't matter.
_ = buffered_command.insert(TransactionRun {
transaction_id: command.transaction_id,
query: String::from(""),
params: None,
}.into());
break Err(err);
let msg = TransactionRunResult {
result: Err(err.into()),
};
tx_res.send(msg.into()).unwrap();
}
}
}
Expand Down Expand Up @@ -636,9 +631,19 @@ impl SessionHolderRunner {
}

command @ (Command::BeginTransaction(_)
| Command::TransactionFunction(_)|Command::AutoCommit(_)) => command.reply_error(
tx_res, session_already_executing_tx_error()
),
| Command::TransactionFunction(_)|Command::AutoCommit(_)) => {
if known_transactions
.get(&id)
.map(|tx| matches!(tx, TxFailState::Failed))
.unwrap_or(false) {
// transaction failed, therefore, we allow to start a new one
_ = buffered_command.insert(command);
return Ok(());
}
command.reply_error(
tx_res, session_already_executing_tx_error(),
)
}
command @ (Command::RetryablePositive(_)
| Command::RetryableNegative(_)) => {
command.reply_error(
Expand Down Expand Up @@ -696,6 +701,11 @@ impl SessionHolderRunner {
.unwrap();
return;
}
command @ (Command::BeginTransaction(_)
| Command::TransactionFunction(_)|Command::AutoCommit(_)) => {
res.expect("expected failed transaction to not error");
_ = buffered_command.insert(command);
}
_ => panic!(
"left transaction function with unexpected buffered command {command:?}"
),
Expand Down