Skip to content

Commit

Permalink
Add arbitration lock time out to shared arbitrator
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Nov 5, 2024
1 parent f9b24d5 commit 5159c25
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 198 deletions.
44 changes: 35 additions & 9 deletions velox/common/memory/ArbitrationParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ uint64_t ArbitrationParticipant::reclaim(
if (targetBytes == 0) {
return 0;
}
std::lock_guard<std::timed_mutex> l(reclaimLock_);
ArbitrationOperationTimedLock l(reclaimMutex_);
TestValue::adjust(
"facebook::velox::memory::ArbitrationParticipant::reclaim", this);
uint64_t reclaimedBytes{0};
Expand Down Expand Up @@ -320,7 +320,7 @@ uint64_t ArbitrationParticipant::shrinkLocked(bool reclaimAll) {

uint64_t ArbitrationParticipant::abort(
const std::exception_ptr& error) noexcept {
std::lock_guard<std::timed_mutex> l(reclaimLock_);
ArbitrationOperationTimedLock l(reclaimMutex_);
return abortLocked(error);
}

Expand Down Expand Up @@ -353,13 +353,6 @@ uint64_t ArbitrationParticipant::abortLocked(
return shrinkLocked(/*reclaimAll=*/true);
}

bool ArbitrationParticipant::waitForReclaimOrAbort(
uint64_t maxWaitTimeNs) const {
std::unique_lock<std::timed_mutex> l(
reclaimLock_, std::chrono::nanoseconds(maxWaitTimeNs));
return l.owns_lock();
}

bool ArbitrationParticipant::hasRunningOp() const {
std::lock_guard<std::mutex> l(stateLock_);
return runningOp_ != nullptr;
Expand Down Expand Up @@ -408,4 +401,37 @@ std::string ArbitrationCandidate::toString() const {
succinctBytes(reclaimableUsedCapacity),
succinctBytes(reclaimableFreeCapacity));
}

ArbitrationOperationTimedLock::ArbitrationOperationTimedLock(
std::timed_mutex& mutex)
: mutex_(mutex) {
auto arbitrationContext = memoryArbitrationContext();
if (arbitrationContext == nullptr) {
mutex_.lock();
return;
}
auto* operation = arbitrationContext->op;
if (operation == nullptr) {
VELOX_CHECK_EQ(
MemoryArbitrationContext::typeName(arbitrationContext->type),
MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type::kGlobal));
mutex_.lock();
return;
}
VELOX_CHECK_EQ(
MemoryArbitrationContext::typeName(arbitrationContext->type),
MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type::kLocal));
if (!mutex_.try_lock_for(std::chrono::nanoseconds(operation->timeoutNs()))) {
VELOX_MEM_ARBITRATION_TIMEOUT(fmt::format(
"Memory arbitration lock timed out on memory pool: {} after running {}",
operation->participant()->name(),
succinctNanos(operation->executionTimeNs())));
}
}

ArbitrationOperationTimedLock::~ArbitrationOperationTimedLock() {
mutex_.unlock();
}
} // namespace facebook::velox::memory
37 changes: 28 additions & 9 deletions velox/common/memory/ArbitrationParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,37 @@
#include "velox/common/memory/Memory.h"

namespace facebook::velox::memory {

#define VELOX_MEM_ARBITRATION_TIMEOUT(errorMessage) \
_VELOX_THROW( \
::facebook::velox::VeloxRuntimeError, \
::facebook::velox::error_source::kErrorSourceRuntime.c_str(), \
::facebook::velox::error_code::kMemArbitrationTimeout.c_str(), \
/* isRetriable */ true, \
"{}", \
errorMessage);

namespace test {
class ArbitrationParticipantTestHelper;
}

class ArbitrationOperation;
class ScopedArbitrationParticipant;

/// Custom lock that keeps track of the time of the ongoing arbitration
/// operation while waiting for the lock. The lock will identify if it needs to
/// apply a wait timeout by checking arbitrationCtx thread local variable. If a
/// local arbitration is ongoing on the current locking thread, timeout will
/// automatically be applied.
class ArbitrationOperationTimedLock {
public:
explicit ArbitrationOperationTimedLock(std::timed_mutex& mutex);
~ArbitrationOperationTimedLock();

private:
std::timed_mutex& mutex_;
};

/// Manages the memory arbitration operations on a query memory pool. It also
/// tracks the arbitration stats during the query memory pool's lifecycle.
class ArbitrationParticipant
Expand Down Expand Up @@ -154,9 +178,9 @@ class ArbitrationParticipant
/// which ensures the liveness of underlying query memory pool. If the query
/// memory pool is being destroyed, then this function returns std::nullopt.
///
// NOTE: it is not safe to directly access arbitration participant as it only
// holds a weak ptr to the query memory pool. Use 'lock()' to get a scoped
// arbitration participant for access.
/// NOTE: it is not safe to directly access arbitration participant as it only
/// holds a weak ptr to the query memory pool. Use 'lock()' to get a scoped
/// arbitration participant for access.
std::optional<ScopedArbitrationParticipant> lock();

/// Returns the corresponding query memory pool.
Expand Down Expand Up @@ -223,11 +247,6 @@ class ArbitrationParticipant
return aborted_;
}

/// Invoked to wait for the pending memory reclaim or abort operation to
/// complete within a 'maxWaitTimeMs' time window. The function returns false
/// if the wait has timed out.
bool waitForReclaimOrAbort(uint64_t maxWaitTimeNs) const;

/// Invoked to start arbitration operation 'op'. The operation needs to wait
/// for the prior arbitration operations to finish first before executing to
/// ensure the serialized execution of arbitration operations from the same
Expand Down Expand Up @@ -333,7 +352,7 @@ class ArbitrationParticipant
tsan_atomic<uint64_t> reclaimedBytes_{0};
tsan_atomic<uint64_t> growBytes_{0};

mutable std::timed_mutex reclaimLock_;
mutable std::timed_mutex reclaimMutex_;

friend class ScopedArbitrationParticipant;
friend class test::ArbitrationParticipantTestHelper;
Expand Down
1 change: 0 additions & 1 deletion velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ std::shared_ptr<MemoryPool> deprecatedAddDefaultLeafMemoryPool(
/// using this method can get a pool that is shared with other threads. The goal
/// is to minimize lock contention while supporting such use cases.
///
///
/// TODO: deprecate this API after all the use cases are able to manage the
/// lifecycle of the allocated memory pools properly.
MemoryPool& deprecatedSharedLeafPool();
Expand Down
14 changes: 10 additions & 4 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,12 @@ bool MemoryArbitrator::Stats::operator<=(const Stats& other) const {
return !(*this > other);
}

MemoryArbitrationContext::MemoryArbitrationContext(const MemoryPool* requestor)
: type(Type::kLocal), requestorName(requestor->name()) {}
MemoryArbitrationContext::MemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* _op)
: type(Type::kLocal), requestorName(requestor->name()), op(_op) {
VELOX_CHECK_NOT_NULL(op);
}

std::string MemoryArbitrationContext::typeName(
MemoryArbitrationContext::Type type) {
Expand All @@ -465,8 +469,10 @@ std::string MemoryArbitrationContext::typeName(
}

ScopedMemoryArbitrationContext::ScopedMemoryArbitrationContext(
const MemoryPool* requestor)
: savedArbitrationCtx_(arbitrationCtx), currentArbitrationCtx_(requestor) {
const MemoryPool* requestor,
ArbitrationOperation* op)
: savedArbitrationCtx_(arbitrationCtx),
currentArbitrationCtx_(requestor, op) {
arbitrationCtx = &currentArbitrationCtx_;
}

Expand Down
25 changes: 17 additions & 8 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
namespace facebook::velox::memory {

class MemoryPool;
class ArbitrationOperation;

using MemoryArbitrationStateCheckCB = std::function<void(MemoryPool&)>;

Expand Down Expand Up @@ -398,11 +399,11 @@ class NonReclaimableSectionGuard {
const bool oldNonReclaimableSectionValue_;
};

/// The memory arbitration context which is set on per-thread local variable by
/// memory arbitrator. It is used to indicate a running thread is under memory
/// arbitration processing or not. This helps to enable sanity check such as all
/// the memory reservations during memory arbitration should come from the
/// spilling memory pool.
/// The memory arbitration context which is set as per-thread local variable by
/// memory arbitrator. It is used to indicate if a running thread is under
/// memory arbitration. This helps to enable sanity check such as all the memory
/// reservations during memory arbitration should come from the spilling memory
/// pool.
struct MemoryArbitrationContext {
/// Defines the type of memory arbitration.
enum class Type {
Expand All @@ -420,20 +421,28 @@ struct MemoryArbitrationContext {
/// global memory arbitration type.
const std::string requestorName;

explicit MemoryArbitrationContext(const MemoryPool* requestor);
ArbitrationOperation* const op;

MemoryArbitrationContext() : type(Type::kGlobal) {}
MemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* _op);

MemoryArbitrationContext() : type(Type::kGlobal), op(nullptr) {}
};

/// Object used to set/restore the memory arbitration context when a thread is
/// under memory arbitration processing.
class ScopedMemoryArbitrationContext {
public:
explicit ScopedMemoryArbitrationContext(const MemoryPool* requestor);
ScopedMemoryArbitrationContext();

explicit ScopedMemoryArbitrationContext(
const MemoryArbitrationContext* context);

ScopedMemoryArbitrationContext(
const MemoryPool* requestor,
ArbitrationOperation* op);

~ScopedMemoryArbitrationContext();

private:
Expand Down
Loading

0 comments on commit 5159c25

Please sign in to comment.