Skip to content

Commit

Permalink
quick fix
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 2, 2023
1 parent 697b2bd commit c00eab5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
29 changes: 16 additions & 13 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,15 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

class EvictGuard {
public:
explicit EvictGuard(SplitState& splitState) : splitState_(splitState) {
oldState_ = splitState;
explicit EvictGuard(SplitState& splitState, SplitState& evictState)
: splitState_(splitState), evictState_(evictState), originalState_(splitState) {
splitState_ = SplitState::kUnevictable;
evictState_ = originalState_;
}

~EvictGuard() {
splitState_ = oldState_;
splitState_ = originalState_;
evictState_ = SplitState::kUnevictable;
}

// For safety and clarity.
Expand All @@ -300,7 +302,8 @@ class EvictGuard {

private:
SplitState& splitState_;
SplitState oldState_;
SplitState& evictState_;
SplitState originalState_;
};

template <facebook::velox::TypeKind kind>
Expand Down Expand Up @@ -1420,7 +1423,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
*actual = 0;
return arrow::Status::OK();
}
EvictGuard{splitState_};
EvictGuard evictGuard{splitState_, evictState_};

int64_t reclaimed = 0;
if (reclaimed < size && shrinkPartitionBuffersBeforeSpill()) {
Expand Down Expand Up @@ -1666,7 +1669,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}

arrow::Result<int64_t> VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
// Evict partition buffers, only when splitState_ == SplitState::kInit, and space freed from
// Evict partition buffers, only when evictState_ == SplitState::kInit, and space freed from
// shrinking is not enough. In this case partition2BufferSize_ == partitionBufferIdxBase_
int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
int64_t evicted = 0;
Expand Down Expand Up @@ -1709,7 +1712,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
// If OOM happens during stop(), the reclaim order is shrink->spill,
// because the partition buffers will be freed soon.
// SinglePartitioning doesn't maintain partition buffers.
return options_.partitioning_name != "single" && splitState_ == SplitState::kStop;
return options_.partitioning_name != "single" && evictState_ == SplitState::kStop;
}

bool VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
Expand All @@ -1718,23 +1721,23 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
// The reclaim order is spill->shrink, because the partition buffers can be reused.
// SinglePartitioning doesn't maintain partition buffers.
return options_.partitioning_name != "single" &&
(splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit);
(evictState_ == SplitState::kSplit || evictState_ == SplitState::kInit);
}

bool VeloxShuffleWriter::evictPartitionBuffersAfterSpill() const {
// If OOM triggered by other operators, the splitState_ is SplitState::kInit.
// If OOM triggered by other operators, the evictState_ is SplitState::kInit.
// The last resort is to evict the partition buffers to reclaim more space.
return options_.partitioning_name != "single" && splitState_ == SplitState::kInit;
return options_.partitioning_name != "single" && evictState_ == SplitState::kInit;
}

arrow::Result<uint32_t> VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint32_t partitionId) const {
if (splitState_ == SplitState::kSplit) {
if (evictState_ == SplitState::kSplit) {
return partitionBufferIdxBase_[partitionId] + partition2RowCount_[partitionId];
}
if (splitState_ == kInit || splitState_ == SplitState::kStop) {
if (evictState_ == kInit || evictState_ == SplitState::kStop) {
return partitionBufferIdxBase_[partitionId];
}
return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(splitState_));
return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(evictState_));
}

arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBufferSize) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ class VeloxShuffleWriter final : public ShuffleWriter {

SplitState splitState_{kInit};

SplitState evictState_{kUnevictable};

bool supportAvx512_ = false;

// store arrow column types
Expand Down

0 comments on commit c00eab5

Please sign in to comment.