diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc index b6f53faea2f9..c8e94ad4dc52 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -283,13 +283,15 @@ arrow::Result> makeUncompressedRecordBatch( class EvictGuard { public: - explicit EvictGuard(SplitState& splitState) : splitState_(splitState) { - oldState_ = splitState; + explicit EvictGuard(SplitState& splitState, SplitState& evictState) + : splitState_(splitState), evictState_(evictState), originalState_(splitState) { splitState_ = SplitState::kUnevictable; + evictState_ = originalState_; } ~EvictGuard() { - splitState_ = oldState_; + splitState_ = originalState_; + evictState_ = SplitState::kUnevictable; } // For safety and clarity. @@ -300,7 +302,8 @@ class EvictGuard { private: SplitState& splitState_; - SplitState oldState_; + SplitState& evictState_; + SplitState originalState_; }; template @@ -1420,7 +1423,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel *actual = 0; return arrow::Status::OK(); } - EvictGuard{splitState_}; + EvictGuard evictGuard{splitState_, evictState_}; int64_t reclaimed = 0; if (reclaimed < size && shrinkPartitionBuffersBeforeSpill()) { @@ -1666,7 +1669,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel } arrow::Result VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t size) { - // Evict partition buffers, only when splitState_ == SplitState::kInit, and space freed from + // Evict partition buffers, only when evictState_ == SplitState::kInit, and space freed from // shrinking is not enough. In this case partition2BufferSize_ == partitionBufferIdxBase_ int64_t beforeEvict = partitionBufferPool_->bytes_allocated(); int64_t evicted = 0; @@ -1709,7 +1712,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel // If OOM happens during stop(), the reclaim order is shrink->spill, // because the partition buffers will be freed soon. // SinglePartitioning doesn't maintain partition buffers. - return options_.partitioning_name != "single" && splitState_ == SplitState::kStop; + return options_.partitioning_name != "single" && evictState_ == SplitState::kStop; } bool VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const { @@ -1718,23 +1721,23 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel // The reclaim order is spill->shrink, because the partition buffers can be reused. // SinglePartitioning doesn't maintain partition buffers. return options_.partitioning_name != "single" && - (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit); + (evictState_ == SplitState::kSplit || evictState_ == SplitState::kInit); } bool VeloxShuffleWriter::evictPartitionBuffersAfterSpill() const { - // If OOM triggered by other operators, the splitState_ is SplitState::kInit. + // If OOM triggered by other operators, the evictState_ is SplitState::kInit. // The last resort is to evict the partition buffers to reclaim more space. - return options_.partitioning_name != "single" && splitState_ == SplitState::kInit; + return options_.partitioning_name != "single" && evictState_ == SplitState::kInit; } arrow::Result VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint32_t partitionId) const { - if (splitState_ == SplitState::kSplit) { + if (evictState_ == SplitState::kSplit) { return partitionBufferIdxBase_[partitionId] + partition2RowCount_[partitionId]; } - if (splitState_ == kInit || splitState_ == SplitState::kStop) { + if (evictState_ == kInit || evictState_ == SplitState::kStop) { return partitionBufferIdxBase_[partitionId]; } - return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(splitState_)); + return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(evictState_)); } arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBufferSize) { diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 2f1e868ec24e..76f878d7ba75 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -310,6 +310,8 @@ class VeloxShuffleWriter final : public ShuffleWriter { SplitState splitState_{kInit}; + SplitState evictState_{kUnevictable}; + bool supportAvx512_ = false; // store arrow column types