Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 2, 2023
1 parent 8ac40e3 commit fc1f3f2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 21 deletions.
33 changes: 14 additions & 19 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,12 @@ arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(

class EvictGuard {
public:
explicit EvictGuard(SplitState& splitState, SplitState& evictState)
: splitState_(splitState), evictState_(evictState), originalState_(splitState) {
splitState_ = SplitState::kUnevictable;
evictState_ = originalState_;
explicit EvictGuard(EvictState& evictState) : evictState_(evictState) {
evictState_ = EvictState::kUnevictable;
}

~EvictGuard() {
splitState_ = originalState_;
evictState_ = SplitState::kUnevictable;
evictState_ = EvictState::kEvictable;
}

// For safety and clarity.
Expand All @@ -301,9 +298,7 @@ class EvictGuard {
EvictGuard& operator=(EvictGuard&&) = delete;

private:
SplitState& splitState_;
SplitState& evictState_;
SplitState originalState_;
EvictState& evictState_;
};

template <facebook::velox::TypeKind kind>
Expand Down Expand Up @@ -1419,11 +1414,11 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}

arrow::Status VeloxShuffleWriter::evictFixedSize(int64_t size, int64_t * actual) {
if (splitState_ == SplitState::kUnevictable) {
if (evictState_ == EvictState::kUnevictable) {
*actual = 0;
return arrow::Status::OK();
}
EvictGuard evictGuard{splitState_, evictState_};
EvictGuard evictGuard{evictState_};

int64_t reclaimed = 0;
if (reclaimed < size && shrinkPartitionBuffersBeforeSpill()) {
Expand Down Expand Up @@ -1669,7 +1664,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}

arrow::Result<int64_t> VeloxShuffleWriter::evictPartitionBuffersMinSize(int64_t size) {
// Evict partition buffers, only when evictState_ == SplitState::kInit, and space freed from
// Evict partition buffers, only when splitState_ == SplitState::kInit, and space freed from
// shrinking is not enough. In this case partition2BufferSize_ == partitionBufferIdxBase_
int64_t beforeEvict = partitionBufferPool_->bytes_allocated();
int64_t evicted = 0;
Expand Down Expand Up @@ -1712,7 +1707,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
// If OOM happens during stop(), the reclaim order is shrink->spill,
// because the partition buffers will be freed soon.
// SinglePartitioning doesn't maintain partition buffers.
return options_.partitioning_name != "single" && evictState_ == SplitState::kStop;
return options_.partitioning_name != "single" && splitState_ == SplitState::kStop;
}

bool VeloxShuffleWriter::shrinkPartitionBuffersAfterSpill() const {
Expand All @@ -1721,23 +1716,23 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
// The reclaim order is spill->shrink, because the partition buffers can be reused.
// SinglePartitioning doesn't maintain partition buffers.
return options_.partitioning_name != "single" &&
(evictState_ == SplitState::kSplit || evictState_ == SplitState::kInit);
(splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit);
}

bool VeloxShuffleWriter::evictPartitionBuffersAfterSpill() const {
// If OOM triggered by other operators, the evictState_ is SplitState::kInit.
// If OOM triggered by other operators, the splitState_ is SplitState::kInit.
// The last resort is to evict the partition buffers to reclaim more space.
return options_.partitioning_name != "single" && evictState_ == SplitState::kInit;
return options_.partitioning_name != "single" && splitState_ == SplitState::kInit;
}

arrow::Result<uint32_t> VeloxShuffleWriter::partitionBufferSizeAfterShrink(uint32_t partitionId) const {
if (evictState_ == SplitState::kSplit) {
if (splitState_ == SplitState::kSplit) {
return partitionBufferIdxBase_[partitionId] + partition2RowCount_[partitionId];
}
if (evictState_ == kInit || evictState_ == SplitState::kStop) {
if (splitState_ == kInit || splitState_ == SplitState::kStop) {
return partitionBufferIdxBase_[partitionId];
}
return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(evictState_));
return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(splitState_));
}

arrow::Status VeloxShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBufferSize) {
Expand Down
5 changes: 3 additions & 2 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ namespace gluten {

#endif // end of VELOX_SHUFFLE_WRITER_PRINT

enum SplitState { kInit, kPreAlloc, kSplit, kStop, kUnevictable };
enum SplitState { kInit, kPreAlloc, kSplit, kStop };
enum EvictState { kEvictable, kUnevictable };

class VeloxShuffleWriter final : public ShuffleWriter {
enum { kValidityBufferIndex = 0, kLengthBufferIndex = 1, kValueBufferIndex = 2 };
Expand Down Expand Up @@ -310,7 +311,7 @@ class VeloxShuffleWriter final : public ShuffleWriter {

SplitState splitState_{kInit};

SplitState evictState_{kUnevictable};
EvictState evictState_{kEvictable};

bool supportAvx512_ = false;

Expand Down

0 comments on commit fc1f3f2

Please sign in to comment.