Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 2, 2023
1 parent f9a5e11 commit 51fe33a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
1 change: 1 addition & 0 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
return arrow::Status::OK();
}
auto originalState = splitState_;
splitState_ = SplitState::kUnevictable;

int64_t reclaimed = 0;
if (reclaimed < size && shrinkPartitionBuffersBeforeSpill(originalState)) {
Expand Down
47 changes: 47 additions & 0 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,53 @@ TEST_F(VeloxShuffleWriterMemoryTest, kStop) {
}
}

TEST_F(VeloxShuffleWriterMemoryTest, kUnevictable) {
auto delegated = shuffleWriterOptions_.memory_pool;
shuffleWriterOptions_.partitioning_name = "rr";
shuffleWriterOptions_.buffer_size = 4;
auto pool = SelfEvictedMemoryPool(delegated);
shuffleWriterOptions_.memory_pool = &pool;
auto shuffleWriter = createShuffleWriter();

pool.setEvictable(shuffleWriter.get());

ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));

// First evict cached payloads.
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->cachedPayloadSize(), &evicted));
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
ASSERT_GT(shuffleWriter->partitionBufferSize(), 0);
// Set limited capacity.
pool.setCapacity(0);
// Evict again. Because no cached payload to evict, it will try to compress and evict partition buffers.
// Throws OOM during allocating compression buffers.
auto status = shuffleWriter->evictFixedSize(shuffleWriter->partitionBufferSize(), &evicted);
ASSERT_TRUE(status.IsOutOfMemory());
}

TEST_F(VeloxShuffleWriterMemoryTest, kUnevictableSingle) {
auto delegated = shuffleWriterOptions_.memory_pool;
shuffleWriterOptions_.partitioning_name = "single";
auto pool = SelfEvictedMemoryPool(delegated);
shuffleWriterOptions_.memory_pool = &pool;
auto shuffleWriter = createShuffleWriter();

pool.setEvictable(shuffleWriter.get());

ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));

// First evict cached payloads.
int64_t evicted;
ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->cachedPayloadSize(), &evicted));
ASSERT_EQ(shuffleWriter->cachedPayloadSize(), 0);
// Set limited capacity.
pool.setCapacity(0);
// Evict again. Single partitioning doesn't have partition buffers, so the evicted size is 0.
ASSERT_NOT_OK(shuffleWriter->evictFixedSize(shuffleWriter->partitionBufferSize(), &evicted));
ASSERT_EQ(evicted, 0);
}

INSTANTIATE_TEST_SUITE_P(
VeloxShuffleWriteParam,
SinglePartitioningShuffleWriter,
Expand Down

0 comments on commit 51fe33a

Please sign in to comment.