Skip to content

Commit

Permalink
[VL] Add config for memory pool init capacity to reduce arbitration t…
Browse files Browse the repository at this point in the history
…imes (#5815)
  • Loading branch information
Yohahaha authored May 22, 2024
1 parent 768301a commit 1c6c7fc
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ abstract class VeloxTPCHTableSupport extends VeloxWholeStageTransformerSuite {
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.gluten.sql.columnar.backend.velox.memInitCapacity", "1m")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ const bool kEnableSystemExceptionStacktraceDefault = true;
const std::string kMemoryUseHugePages = "spark.gluten.sql.columnar.backend.velox.memoryUseHugePages";
const bool kMemoryUseHugePagesDefault = false;

const std::string kVeloxMemInitCapacity = "spark.gluten.sql.columnar.backend.velox.memInitCapacity";
const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;

const std::string kHiveConnectorId = "test-hive";
const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled";

Expand Down
12 changes: 9 additions & 3 deletions cpp/velox/memory/VeloxMemoryManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator {
}

uint64_t growCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
VELOX_CHECK_EQ(targetBytes, 0, "Gluten has set MemoryManagerOptions.memoryPoolInitCapacity to 0")
return 0;
std::lock_guard<std::recursive_mutex> l(mutex_);
listener_->allocationChanged(targetBytes);
if (!pool->grow(targetBytes, 0)) {
VELOX_FAIL("Failed to grow root pool's capacity for {}", velox::succinctBytes(targetBytes));
}
return targetBytes;
}

uint64_t shrinkCapacity(velox::memory::MemoryPool* pool, uint64_t targetBytes) override {
Expand Down Expand Up @@ -160,6 +164,8 @@ VeloxMemoryManager::VeloxMemoryManager(
: MemoryManager(), name_(name), listener_(std::move(listener)) {
auto reservationBlockSize = VeloxBackend::get()->getBackendConf()->get<uint64_t>(
kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
auto memInitCapacity =
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, kVeloxMemInitCapacityDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), reservationBlockSize);
listenableAlloc_ = std::make_unique<ListenableMemoryAllocator>(allocator.get(), blockListener_.get());
arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());
Expand All @@ -173,7 +179,7 @@ VeloxMemoryManager::VeloxMemoryManager(
.coreOnAllocationFailureEnabled = false,
.allocatorCapacity = velox::memory::kMaxMemory,
.arbitratorKind = afr.getKind(),
.memoryPoolInitCapacity = 0,
.memoryPoolInitCapacity = memInitCapacity,
.memoryPoolTransferCapacity = reservationBlockSize,
.memoryReclaimWaitMs = 0};
veloxMemoryManager_ = std::make_unique<velox::memory::MemoryManager>(mmOptions);
Expand Down
13 changes: 8 additions & 5 deletions cpp/velox/tests/MemoryManagerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include "benchmarks/common/BenchmarkUtils.h"
#include "compute/VeloxBackend.h"
#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"
#include "memory/VeloxMemoryManager.h"
#include "velox/common/base/tests/GTestUtils.h"

Expand Down Expand Up @@ -48,7 +48,8 @@ class MemoryManagerTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
std::unordered_map<std::string, std::string> conf = {
{kMemoryReservationBlockSize, std::to_string(kMemoryReservationBlockSizeDefault)}};
{kMemoryReservationBlockSize, std::to_string(kMemoryReservationBlockSizeDefault)},
{kVeloxMemInitCapacity, std::to_string(kVeloxMemInitCapacityDefault)}};
initVeloxBackend(conf);
}

Expand Down Expand Up @@ -93,6 +94,8 @@ TEST_F(MemoryManagerTest, memoryPoolWithBlockReseravtion) {
}

TEST_F(MemoryManagerTest, memoryAllocatorWithBlockReservation) {
auto initBytes = listener_->currentBytes();

std::vector<Allocation> allocations;
std::vector<uint64_t> sizes{
kMemoryReservationBlockSizeDefault - 1 * kMB, kMemoryReservationBlockSizeDefault - 2 * kMB};
Expand All @@ -105,7 +108,7 @@ TEST_F(MemoryManagerTest, memoryAllocatorWithBlockReservation) {

EXPECT_EQ(allocator_->getBytes(), currentBytes + size);
EXPECT_EQ(allocator_->peakBytes(), allocator_->getBytes());
EXPECT_EQ(listener_->currentBytes(), (i + 1) * kMemoryReservationBlockSizeDefault);
EXPECT_EQ(listener_->currentBytes(), (i + 1) * kMemoryReservationBlockSizeDefault + initBytes);
EXPECT_EQ(listener_->peakBytes(), listener_->currentBytes());
}

Expand All @@ -114,14 +117,14 @@ TEST_F(MemoryManagerTest, memoryAllocatorWithBlockReservation) {
allocations.pop_back();
allocator_->free(allocation.buffer, allocation.size);
EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault);
EXPECT_EQ(listener_->currentBytes(), kMemoryReservationBlockSizeDefault + initBytes);

currentBytes = allocator_->getBytes();
allocation = allocations.back();
allocations.pop_back();
allocator_->free(allocation.buffer, allocation.size);
EXPECT_EQ(allocator_->getBytes(), currentBytes - allocation.size);
EXPECT_EQ(listener_->currentBytes(), 0);
EXPECT_EQ(listener_->currentBytes(), initBytes);

ASSERT_EQ(allocator_->getBytes(), 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,13 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1GB")

val COLUMNAR_VELOX_MEM_INIT_CAPACITY =
buildConf("spark.gluten.sql.columnar.backend.velox.memInitCapacity")
.internal()
.doc("The initial memory capacity to reserve for a newly created Velox query memory pool.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8MB")

val COLUMNAR_VELOX_SSD_CACHE_PATH =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
.internal()
Expand Down

0 comments on commit 1c6c7fc

Please sign in to comment.