From c50c4fa60d1a8bf6e7a80bccda92da942f122cbf Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Tue, 8 Oct 2024 17:42:34 +0200 Subject: [PATCH] GH-44334: [C++] Fix S3 error handling in `ObjectOutputStream` (#44335) ### Rationale for this change See [#GH-44334](https://github.com/apache/arrow/issues/44334). Errors from the AWS SDK are not correctly propagated onto the user of the `ObjectOutputStream`, not indicating an error even though there was one in some cases. ### What changes are included in this PR? - Directly pass the outcome of the AWS SDK to `HandleUploadUsingSingleRequestOutcome` aswell as `HandleUploadPartOutcome` instead of wrapping it in a arrow `Result` class which has been constructed implictily, always indicating success. - Adjust cleanup handling in `Close` so that the output stream is closed if there was an error in any of the called methods. Otherwise, destructing the output stream in debug builds fails as we abort if `Close()` returns something else than `Status::OK()`. See the [code pointer here](https://github.com/apache/arrow/blob/64891d1d176dd45f3fae574e1bcfac6fee197e5f/cpp/src/arrow/io/interfaces.cc#L293). ### Are these changes tested? - Added assertions for catching exceptions on `Close()` in case `delayed_open` is enabled. ### Are there any user-facing changes? No. * GitHub Issue: #44334 Authored-by: Oliver Layer Signed-off-by: Antoine Pitrou --- cpp/src/arrow/filesystem/s3fs.cc | 48 +++++++++++++-------------- cpp/src/arrow/filesystem/s3fs_test.cc | 5 ++- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 3a0ade3d2e322..13d6ead6ef686 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -1784,15 +1784,23 @@ class ObjectOutputStream final : public io::OutputStream { return Status::OK(); } + Status CleanupIfFailed(Status status) { + if (!status.ok()) { + RETURN_NOT_OK(CleanupAfterClose()); + return status; + } + return Status::OK(); + } + Status Close() override { if (closed_) return Status::OK(); - RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + RETURN_NOT_OK(CleanupIfFailed(EnsureReadyToFlushFromClose())); - RETURN_NOT_OK(Flush()); + RETURN_NOT_OK(CleanupIfFailed(Flush())); if (IsMultipartCreated()) { - RETURN_NOT_OK(FinishPartUploadAfterFlush()); + RETURN_NOT_OK(CleanupIfFailed(FinishPartUploadAfterFlush())); } return CleanupAfterClose(); @@ -1801,12 +1809,12 @@ class ObjectOutputStream final : public io::OutputStream { Future<> CloseAsync() override { if (closed_) return Status::OK(); - RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + RETURN_NOT_OK(CleanupIfFailed(EnsureReadyToFlushFromClose())); // Wait for in-progress uploads to finish (if async writes are enabled) return FlushAsync().Then([self = Self()]() { if (self->IsMultipartCreated()) { - RETURN_NOT_OK(self->FinishPartUploadAfterFlush()); + RETURN_NOT_OK(self->CleanupIfFailed(self->FinishPartUploadAfterFlush())); } return self->CleanupAfterClose(); }); @@ -2021,7 +2029,7 @@ class ObjectOutputStream final : public io::OutputStream { std::shared_ptr state, int32_t part_number, Aws::S3::Model::PutObjectOutcome outcome) { - HandleUploadUsingSingleRequestOutcome(state, request, outcome.GetResult()); + HandleUploadUsingSingleRequestOutcome(state, request, outcome); return Status::OK(); }; @@ -2072,7 +2080,7 @@ class ObjectOutputStream final : public io::OutputStream { std::shared_ptr state, int32_t part_number, Aws::S3::Model::UploadPartOutcome outcome) { - HandleUploadPartOutcome(state, part_number, request, outcome.GetResult()); + HandleUploadPartOutcome(state, part_number, request, outcome); return Status::OK(); }; @@ -2083,16 +2091,12 @@ class ObjectOutputStream final : public io::OutputStream { static void HandleUploadUsingSingleRequestOutcome( const std::shared_ptr& state, const S3Model::PutObjectRequest& req, - const Result& result) { + const S3Model::PutObjectOutcome& outcome) { std::unique_lock lock(state->mutex); - if (!result.ok()) { - state->status &= result.status(); - } else { - const auto& outcome = *result; - if (!outcome.IsSuccess()) { - state->status &= UploadUsingSingleRequestError(req, outcome); - } + if (!outcome.IsSuccess()) { + state->status &= UploadUsingSingleRequestError(req, outcome); } + // GH-41862: avoid potential deadlock if the Future's callback is called // with the mutex taken. auto fut = state->pending_uploads_completed; @@ -2103,18 +2107,14 @@ class ObjectOutputStream final : public io::OutputStream { static void HandleUploadPartOutcome(const std::shared_ptr& state, int part_number, const S3Model::UploadPartRequest& req, - const Result& result) { + const S3Model::UploadPartOutcome& outcome) { std::unique_lock lock(state->mutex); - if (!result.ok()) { - state->status &= result.status(); + if (!outcome.IsSuccess()) { + state->status &= UploadPartError(req, outcome); } else { - const auto& outcome = *result; - if (!outcome.IsSuccess()) { - state->status &= UploadPartError(req, outcome); - } else { - AddCompletedPart(state, part_number, outcome.GetResult()); - } + AddCompletedPart(state, part_number, outcome.GetResult()); } + // Notify completion if (--state->uploads_in_progress == 0) { // GH-41862: avoid potential deadlock if the Future's callback is called diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index b8f497d23c9a3..43091aaa986d9 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -572,7 +572,10 @@ class TestS3FS : public S3TestMixin { void TestOpenOutputStream(bool allow_delayed_open) { std::shared_ptr stream; - if (!allow_delayed_open) { + if (allow_delayed_open) { + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("nonexistent-bucket/somefile")); + ASSERT_RAISES(IOError, stream->Close()); + } else { // Nonexistent ASSERT_RAISES(IOError, fs_->OpenOutputStream("nonexistent-bucket/somefile")); }