diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index f5620a96622e..824113c4571b 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -194,6 +194,22 @@ uint32_t SharedArbitrator::ExtraConfig::globalArbitrationMemoryReclaimPct( kDefaultGlobalMemoryArbitrationReclaimPct); } +bool SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill( + const std::unordered_map& configs) { + return getConfig( + configs, + kGlobalArbitrationWithoutSpill, + kDefaultGlobalArbitrationWithoutSpill); +} + +double SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio( + const std::unordered_map& configs) { + return getConfig( + configs, + kGlobalArbitrationAbortTimeRatio, + kDefaultGlobalArbitrationAbortTimeRatio); +} + SharedArbitrator::SharedArbitrator(const Config& config) : MemoryArbitrator(config), reservedCapacity_(ExtraConfig::reservedCapacity(config.extraConfigs)), @@ -216,6 +232,10 @@ SharedArbitrator::SharedArbitrator(const Config& config) ExtraConfig::globalArbitrationEnabled(config.extraConfigs)), globalArbitrationMemoryReclaimPct_( ExtraConfig::globalArbitrationMemoryReclaimPct(config.extraConfigs)), + globalArbitrationAbortTimeRatio_( + ExtraConfig::globalArbitrationAbortTimeRatio(config.extraConfigs)), + globalArbitrationWithoutSpill_( + ExtraConfig::globalArbitrationWithoutSpill(config.extraConfigs)), freeReservedCapacity_(reservedCapacity_), freeNonReservedCapacity_(capacity_ - freeReservedCapacity_) { VELOX_CHECK_EQ(kind_, config.kind); @@ -252,7 +272,11 @@ SharedArbitrator::SharedArbitrator(const Config& config) VELOX_MEM_LOG(INFO) << "Arbitration config: max arbitration time " << succinctMillis(maxArbitrationTimeMs_) << ", global memory reclaim percentage " - << globalArbitrationMemoryReclaimPct_; + << globalArbitrationMemoryReclaimPct_ + << ", global arbitration abort time ratio " + << globalArbitrationAbortTimeRatio_ + << ", global arbitration skip spill " + << globalArbitrationWithoutSpill_; } VELOX_MEM_LOG(INFO) << "Memory pool participant config: " << participantConfig_.toString(); @@ -839,7 +863,7 @@ void SharedArbitrator::runGlobalArbitration() { const uint64_t startTimeMs = getCurrentTimeMs(); uint64_t totalReclaimedBytes{0}; - bool reclaimByAbort{false}; + bool shouldReclaimByAbort{false}; uint64_t reclaimedBytes{0}; std::unordered_set reclaimedParticipants; std::unordered_set failedParticipants; @@ -857,19 +881,19 @@ void SharedArbitrator::runGlobalArbitration() { // Check if we need to abort participant to reclaim used memory to // accelerate global arbitration. - // - // TODO: make the time based condition check configurable. - reclaimByAbort = - (getCurrentTimeMs() - startTimeMs) < maxArbitrationTimeMs_ / 2 && - (reclaimByAbort || (allParticipantsReclaimed && reclaimedBytes == 0)); - if (!reclaimByAbort) { + shouldReclaimByAbort = globalArbitrationWithoutSpill_ || + ((getCurrentTimeMs() - startTimeMs) > + maxArbitrationTimeMs_ * globalArbitrationAbortTimeRatio_ && + (shouldReclaimByAbort || + (allParticipantsReclaimed && reclaimedBytes == 0))); + if (shouldReclaimByAbort) { + reclaimedBytes = reclaimUsedMemoryByAbort(/*force=*/true); + } else { reclaimedBytes = reclaimUsedMemoryBySpill( targetBytes, reclaimedParticipants, failedParticipants, allParticipantsReclaimed); - } else { - reclaimedBytes = reclaimUsedMemoryByAbort(/*force=*/true); } totalReclaimedBytes += reclaimedBytes; reclaimUnusedCapacity(); diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 7d43e2b6eb1e..0c6472dac7f4 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -199,6 +199,25 @@ class SharedArbitrator : public memory::MemoryArbitrator { static uint32_t globalArbitrationMemoryReclaimPct( const std::unordered_map& configs); + /// The ratio used with 'memory-reclaim-max-wait-time', beyond which, global + /// arbitration will no longer reclaim memory by spilling, but instead + /// directly abort. It is only in effect when 'global-arbitration-enabled' + /// is true + static constexpr std::string_view kGlobalArbitrationAbortTimeRatio{ + "global-arbitration-abort-time-ratio"}; + static constexpr double kDefaultGlobalArbitrationAbortTimeRatio{0.5}; + static double globalArbitrationAbortTimeRatio( + const std::unordered_map& configs); + + /// If true, global arbitration will not reclaim memory by spilling, but + /// only by aborting. This flag is only effective if + /// 'global-arbitration-enabled' is true + static constexpr std::string_view kGlobalArbitrationWithoutSpill{ + "global-arbitration-without-spill"}; + static constexpr bool kDefaultGlobalArbitrationWithoutSpill{false}; + static bool globalArbitrationWithoutSpill( + const std::unordered_map& configs); + /// If true, do sanity check on the arbitrator state on destruction. /// /// TODO: deprecate this flag after all the existing memory leak use cases @@ -565,6 +584,8 @@ class SharedArbitrator : public memory::MemoryArbitrator { const double memoryReclaimThreadsHwMultiplier_; const bool globalArbitrationEnabled_; const uint32_t globalArbitrationMemoryReclaimPct_; + const double globalArbitrationAbortTimeRatio_; + const bool globalArbitrationWithoutSpill_; // The executor used to reclaim memory from multiple participants in parallel // at the background for global arbitration or external memory reclamation. diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 67eb1ffa4d91..f2be8b1dbb91 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -450,7 +450,8 @@ class MockSharedArbitrationTest : public testing::Test { kMemoryReclaimThreadsHwMultiplier, std::function arbitrationStateCheckCb = nullptr, bool globalArtbitrationEnabled = true, - uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000) { + uint64_t arbitrationTimeoutMs = 5 * 60 * 1'000, + bool globalArbitrationWithoutSpill = false) { MemoryManagerOptions options; options.allocatorCapacity = memoryCapacity; std::string arbitratorKind = "SHARED"; @@ -483,7 +484,9 @@ class MockSharedArbitrationTest : public testing::Test { {std::string(ExtraConfig::kMemoryReclaimMaxWaitTime), folly::to(arbitrationTimeoutMs) + "ms"}, {std::string(ExtraConfig::kGlobalArbitrationEnabled), - folly::to(globalArtbitrationEnabled)}}; + folly::to(globalArtbitrationEnabled)}, + {std::string(ExtraConfig::kGlobalArbitrationWithoutSpill), + folly::to(globalArbitrationWithoutSpill)}}; options.arbitrationStateCheckCb = std::move(arbitrationStateCheckCb); options.checkUsageLeak = true; manager_ = std::make_unique(options); @@ -595,6 +598,14 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier( emptyConfigs), SharedArbitrator::ExtraConfig::kDefaultMemoryReclaimThreadsHwMultiplier); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill( + emptyConfigs), + SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationWithoutSpill); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio( + emptyConfigs), + SharedArbitrator::ExtraConfig::kDefaultGlobalArbitrationAbortTimeRatio); // Testing custom values std::unordered_map configs; @@ -620,6 +631,11 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { configs[std::string( SharedArbitrator::ExtraConfig::kMemoryReclaimThreadsHwMultiplier)] = "1.0"; + configs[std::string( + SharedArbitrator::ExtraConfig::kGlobalArbitrationWithoutSpill)] = "true"; + configs[std::string( + SharedArbitrator::ExtraConfig::kGlobalArbitrationAbortTimeRatio)] = "0.8"; + ASSERT_EQ(SharedArbitrator::ExtraConfig::reservedCapacity(configs), 100); ASSERT_EQ( SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(configs), @@ -642,6 +658,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { ASSERT_EQ( SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(configs), 1.0); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(configs), + true); + ASSERT_EQ( + SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(configs), + 0.8); // Testing invalid values configs[std::string(SharedArbitrator::ExtraConfig::kReservedCapacity)] = @@ -667,6 +689,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { configs[std::string( SharedArbitrator::ExtraConfig::kMemoryReclaimThreadsHwMultiplier)] = "invalid"; + configs[std::string( + SharedArbitrator::ExtraConfig::kGlobalArbitrationWithoutSpill)] = + "invalid"; + configs[std::string( + SharedArbitrator::ExtraConfig::kGlobalArbitrationAbortTimeRatio)] = + "invalid"; VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::reservedCapacity(configs), @@ -701,6 +729,12 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { VELOX_ASSERT_THROW( SharedArbitrator::ExtraConfig::memoryReclaimThreadsHwMultiplier(configs), "Failed while parsing SharedArbitrator configs"); + VELOX_ASSERT_THROW( + SharedArbitrator::ExtraConfig::globalArbitrationWithoutSpill(configs), + "Failed while parsing SharedArbitrator configs"); + VELOX_ASSERT_THROW( + SharedArbitrator::ExtraConfig::globalArbitrationAbortTimeRatio(configs), + "Failed while parsing SharedArbitrator configs"); // Invalid memory reclaim executor hw multiplier. VELOX_ASSERT_THROW( setupMemory(kMemoryCapacity, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1), @@ -1677,6 +1711,56 @@ DEBUG_ONLY_TEST_F( ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].sum, 1); } +TEST_F(MockSharedArbitrationTest, globalArbitrationWithoutSpill) { + const int64_t memoryCapacity = 512 << 20; + const uint64_t memoryPoolInitCapacity = memoryCapacity / 2; + setupMemory( + memoryCapacity, + 0, + memoryPoolInitCapacity, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + kMemoryReclaimThreadsHwMultiplier, + nullptr, + true, + 5 * 60 * 1'000, + true); + auto triggerTask = addTask(memoryCapacity); + auto* triggerOp = triggerTask->addMemoryOp(false); + triggerOp->allocate(memoryCapacity / 2); + + auto abortTask = addTask(memoryCapacity / 2); + auto* abortOp = abortTask->addMemoryOp(true); + abortOp->allocate(memoryCapacity / 2); + ASSERT_EQ(triggerTask->capacity(), memoryCapacity / 2); + + std::unordered_map runtimeStats; + auto statsWriter = std::make_unique(runtimeStats); + setThreadLocalRunTimeStatWriter(statsWriter.get()); + triggerOp->allocate(memoryCapacity / 2); + + ASSERT_EQ( + runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].count, 1); + ASSERT_GT(runtimeStats[SharedArbitrator::kMemoryArbitrationWallNanos].sum, 0); + ASSERT_EQ( + runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].count, 1); + ASSERT_EQ(runtimeStats[SharedArbitrator::kGlobalArbitrationWaitCount].sum, 1); + ASSERT_EQ(runtimeStats[SharedArbitrator::kLocalArbitrationCount].count, 0); + + ASSERT_TRUE(triggerTask->error() == nullptr); + ASSERT_EQ(triggerTask->capacity(), memoryCapacity); + ASSERT_TRUE(abortTask->error() != nullptr); + VELOX_ASSERT_THROW( + std::rethrow_exception(abortTask->error()), + "Memory pool aborted to reclaim used memory"); +} + DEBUG_ONLY_TEST_F(MockSharedArbitrationTest, multipleGlobalRuns) { const int64_t memoryCapacity = 512 << 20; const uint64_t memoryPoolInitCapacity = memoryCapacity / 2;