Skip to content

Commit

Permalink
GH-41862: [C++][S3] Fix potential deadlock when closing output stream (
Browse files Browse the repository at this point in the history
…#41876)

### Rationale for this change

When the Future returned by `OutputStream::CloseAsync` finishes, it can invoke a user-supplied callback. That callback may well destroy the stream as a side effect. If the stream is a S3 output stream, this might lead to a deadlock involving the mutex in the output stream's `UploadState` structure, since the callback is called with that mutex locked.

### What changes are included in this PR?

Unlock the `UploadState` mutex before marking the Future finished, to avoid deadlocking.

### Are these changes tested?

No. Unfortunately, I wasn't able to write a test that would trigger the original condition. Additional preconditions seem to be required to reproduce the deadlock. For example, it might require a mutex implementation that hangs if destroyed while locked.

### Are there any user-facing changes?

No.

* GitHub Issue: #41862

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou authored Jun 10, 2024
1 parent 53c1fc9 commit 036fca0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
18 changes: 15 additions & 3 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,10 @@ class ObjectOutputStream final : public io::OutputStream {
io::internal::CloseFromDestructor(this);
}

std::shared_ptr<ObjectOutputStream> Self() {
return std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
}

Status Init() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());

Expand Down Expand Up @@ -1724,9 +1728,9 @@ class ObjectOutputStream final : public io::OutputStream {

RETURN_NOT_OK(EnsureReadyToFlushFromClose());

auto self = std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([self]() { return self->FinishPartUploadAfterFlush(); });
return FlushAsync().Then(
[self = Self()]() { return self->FinishPartUploadAfterFlush(); });
}

bool closed() const override { return closed_; }
Expand Down Expand Up @@ -1892,7 +1896,15 @@ class ObjectOutputStream final : public io::OutputStream {
}
// Notify completion
if (--state->parts_in_progress == 0) {
state->pending_parts_completed.MarkFinished(state->status);
// GH-41862: avoid potential deadlock if the Future's callback is called
// with the mutex taken.
auto fut = state->pending_parts_completed;
lock.unlock();
// State could be mutated concurrently if another thread writes to the
// stream, but in this case the Flush() call is only advisory anyway.
// Besides, it's not generally sound to write to an OutputStream from
// several threads at once.
fut.MarkFinished(state->status);
}
}

Expand Down
31 changes: 29 additions & 2 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,26 @@ class TestS3FS : public S3TestMixin {
// after CloseAsync or synchronously after stream.reset() since we're just
// checking that `closeAsyncFut` keeps the stream alive until completion
// rather than segfaulting on a dangling stream
auto closeAsyncFut = stream->CloseAsync();
auto close_fut = stream->CloseAsync();
stream.reset();
ASSERT_OK(closeAsyncFut.MoveResult());
ASSERT_OK(close_fut.MoveResult());
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}

void TestOpenOutputStreamCloseAsyncFutureDeadlock() {
// This is inspired by GH-41862, though it fails to reproduce the actual
// issue reported there (actual preconditions might be required).
// Here we lose our reference to an output stream from its CloseAsync callback.
// This should not deadlock.
std::shared_ptr<io::OutputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
ASSERT_OK(stream->Write("new data"));
auto close_fut = stream->CloseAsync();
close_fut.AddCallback([stream = std::move(stream)](Status st) mutable {
// Trigger stream destruction from callback
stream.reset();
});
ASSERT_OK(close_fut.MoveResult());
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}

Expand Down Expand Up @@ -1254,6 +1271,16 @@ TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) {
TestOpenOutputStreamCloseAsyncDestructor();
}

TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) {
TestOpenOutputStreamCloseAsyncFutureDeadlock();
}

TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) {
options_.background_writes = false;
MakeFileSystem();
TestOpenOutputStreamCloseAsyncFutureDeadlock();
}

TEST_F(TestS3FS, OpenOutputStreamMetadata) {
std::shared_ptr<io::OutputStream> stream;

Expand Down

0 comments on commit 036fca0

Please sign in to comment.