Skip to content

Commit

Permalink
Fix S3 error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
OliLay committed Oct 8, 2024
1 parent 64891d1 commit 6973f76
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
48 changes: 24 additions & 24 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1784,15 +1784,23 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}

Status CleanupIfFailed(Status status) {
if (!status.ok()) {
RETURN_NOT_OK(CleanupAfterClose());
return status;
}
return Status::OK();
}

Status Close() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());
RETURN_NOT_OK(CleanupIfFailed(EnsureReadyToFlushFromClose()));

RETURN_NOT_OK(Flush());
RETURN_NOT_OK(CleanupIfFailed(Flush()));

if (IsMultipartCreated()) {
RETURN_NOT_OK(FinishPartUploadAfterFlush());
RETURN_NOT_OK(CleanupIfFailed(FinishPartUploadAfterFlush()));
}

return CleanupAfterClose();
Expand All @@ -1801,12 +1809,12 @@ class ObjectOutputStream final : public io::OutputStream {
Future<> CloseAsync() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(EnsureReadyToFlushFromClose());
RETURN_NOT_OK(CleanupIfFailed(EnsureReadyToFlushFromClose()));

// Wait for in-progress uploads to finish (if async writes are enabled)
return FlushAsync().Then([self = Self()]() {
if (self->IsMultipartCreated()) {
RETURN_NOT_OK(self->FinishPartUploadAfterFlush());
RETURN_NOT_OK(self->CleanupIfFailed(self->FinishPartUploadAfterFlush()));
}
return self->CleanupAfterClose();
});
Expand Down Expand Up @@ -2021,7 +2029,7 @@ class ObjectOutputStream final : public io::OutputStream {
std::shared_ptr<UploadState> state,
int32_t part_number,
Aws::S3::Model::PutObjectOutcome outcome) {
HandleUploadUsingSingleRequestOutcome(state, request, outcome.GetResult());
HandleUploadUsingSingleRequestOutcome(state, request, outcome);
return Status::OK();
};

Expand Down Expand Up @@ -2072,7 +2080,7 @@ class ObjectOutputStream final : public io::OutputStream {
std::shared_ptr<UploadState> state,
int32_t part_number,
Aws::S3::Model::UploadPartOutcome outcome) {
HandleUploadPartOutcome(state, part_number, request, outcome.GetResult());
HandleUploadPartOutcome(state, part_number, request, outcome);
return Status::OK();
};

Expand All @@ -2083,16 +2091,12 @@ class ObjectOutputStream final : public io::OutputStream {

static void HandleUploadUsingSingleRequestOutcome(
const std::shared_ptr<UploadState>& state, const S3Model::PutObjectRequest& req,
const Result<S3Model::PutObjectOutcome>& result) {
const S3Model::PutObjectOutcome& outcome) {
std::unique_lock<std::mutex> lock(state->mutex);
if (!result.ok()) {
state->status &= result.status();
} else {
const auto& outcome = *result;
if (!outcome.IsSuccess()) {
state->status &= UploadUsingSingleRequestError(req, outcome);
}
if (!outcome.IsSuccess()) {
state->status &= UploadUsingSingleRequestError(req, outcome);
}

// GH-41862: avoid potential deadlock if the Future's callback is called
// with the mutex taken.
auto fut = state->pending_uploads_completed;
Expand All @@ -2103,18 +2107,14 @@ class ObjectOutputStream final : public io::OutputStream {
static void HandleUploadPartOutcome(const std::shared_ptr<UploadState>& state,
int part_number,
const S3Model::UploadPartRequest& req,
const Result<S3Model::UploadPartOutcome>& result) {
const S3Model::UploadPartOutcome& outcome) {
std::unique_lock<std::mutex> lock(state->mutex);
if (!result.ok()) {
state->status &= result.status();
if (!outcome.IsSuccess()) {
state->status &= UploadPartError(req, outcome);
} else {
const auto& outcome = *result;
if (!outcome.IsSuccess()) {
state->status &= UploadPartError(req, outcome);
} else {
AddCompletedPart(state, part_number, outcome.GetResult());
}
AddCompletedPart(state, part_number, outcome.GetResult());
}

// Notify completion
if (--state->uploads_in_progress == 0) {
// GH-41862: avoid potential deadlock if the Future's callback is called
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,10 @@ class TestS3FS : public S3TestMixin {
void TestOpenOutputStream(bool allow_delayed_open) {
std::shared_ptr<io::OutputStream> stream;

if (!allow_delayed_open) {
if (allow_delayed_open) {
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("nonexistent-bucket/somefile"));
ASSERT_RAISES(IOError, stream->Close());
} else {
// Nonexistent
ASSERT_RAISES(IOError, fs_->OpenOutputStream("nonexistent-bucket/somefile"));
}
Expand Down

0 comments on commit 6973f76

Please sign in to comment.