diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index c456be2d0d3cd..99cee19ed1e78 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1606,6 +1606,10 @@ class ObjectOutputStream final : public io::OutputStream { io::internal::CloseFromDestructor(this); } + std::shared_ptr Self() { + return std::dynamic_pointer_cast(shared_from_this()); + } + Status Init() { ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); @@ -1724,9 +1728,9 @@ class ObjectOutputStream final : public io::OutputStream { RETURN_NOT_OK(EnsureReadyToFlushFromClose()); - auto self = std::dynamic_pointer_cast(shared_from_this()); // Wait for in-progress uploads to finish (if async writes are enabled) - return FlushAsync().Then([self]() { return self->FinishPartUploadAfterFlush(); }); + return FlushAsync().Then( + [self = Self()]() { return self->FinishPartUploadAfterFlush(); }); } bool closed() const override { return closed_; } @@ -1892,7 +1896,15 @@ class ObjectOutputStream final : public io::OutputStream { } // Notify completion if (--state->parts_in_progress == 0) { - state->pending_parts_completed.MarkFinished(state->status); + // GH-41862: avoid potential deadlock if the Future's callback is called + // with the mutex taken. + auto fut = state->pending_parts_completed; + lock.unlock(); + // State could be mutated concurrently if another thread writes to the + // stream, but in this case the Flush() call is only advisory anyway. + // Besides, it's not generally sound to write to an OutputStream from + // several threads at once. + fut.MarkFinished(state->status); } } diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 7bfa120eda678..5a160a78ceea0 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -614,9 +614,26 @@ class TestS3FS : public S3TestMixin { // after CloseAsync or synchronously after stream.reset() since we're just // checking that `closeAsyncFut` keeps the stream alive until completion // rather than segfaulting on a dangling stream - auto closeAsyncFut = stream->CloseAsync(); + auto close_fut = stream->CloseAsync(); stream.reset(); - ASSERT_OK(closeAsyncFut.MoveResult()); + ASSERT_OK(close_fut.MoveResult()); + AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); + } + + void TestOpenOutputStreamCloseAsyncFutureDeadlock() { + // This is inspired by GH-41862, though it fails to reproduce the actual + // issue reported there (actual preconditions might be required). + // Here we lose our reference to an output stream from its CloseAsync callback. + // This should not deadlock. + std::shared_ptr stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile")); + ASSERT_OK(stream->Write("new data")); + auto close_fut = stream->CloseAsync(); + close_fut.AddCallback([stream = std::move(stream)](Status st) mutable { + // Trigger stream destruction from callback + stream.reset(); + }); + ASSERT_OK(close_fut.MoveResult()); AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); } @@ -1254,6 +1271,16 @@ TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) { TestOpenOutputStreamCloseAsyncDestructor(); } +TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) { + TestOpenOutputStreamCloseAsyncFutureDeadlock(); +} + +TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) { + options_.background_writes = false; + MakeFileSystem(); + TestOpenOutputStreamCloseAsyncFutureDeadlock(); +} + TEST_F(TestS3FS, OpenOutputStreamMetadata) { std::shared_ptr stream;