forked from facebookincubator/velox
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add arbitration participant and operation objects to support global m…
…emory arbitration optimization (facebookincubator#11074) Summary: Pull Request resolved: facebookincubator#11074 Add arbitration participant object to provide all the required arbitration operations and state management on a query memory pool inside the memory arbitrator, such as arbitration queue to serialize the arbitration request execution from the same query and the serialize the reclaim, shrink, grow and abort from either arbitration request and the background memory arbitrations. Add arbitration operation object to manage a memory arbitration request execution Reviewed By: tanjialiang Differential Revision: D63055730 fbshipit-source-id: 0db85eccf6c383807eb006f1fcfad8cb0b0aa596
- Loading branch information
1 parent
cc46d81
commit dcaae29
Showing
6 changed files
with
2,862 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "velox/common/memory/ArbitrationOperation.h" | ||
#include <mutex> | ||
|
||
#include "velox/common/base/Exceptions.h" | ||
#include "velox/common/base/RuntimeMetrics.h" | ||
#include "velox/common/memory/Memory.h" | ||
#include "velox/common/testutil/TestValue.h" | ||
#include "velox/common/time/Timer.h" | ||
|
||
using facebook::velox::common::testutil::TestValue; | ||
|
||
namespace facebook::velox::memory { | ||
using namespace facebook::velox::memory; | ||
|
||
ArbitrationOperation::ArbitrationOperation( | ||
ScopedArbitrationParticipant&& participant, | ||
uint64_t requestBytes, | ||
uint64_t timeoutMs) | ||
: requestBytes_(requestBytes), | ||
timeoutMs_(timeoutMs), | ||
createTimeMs_(getCurrentTimeMs()), | ||
participant_(std::move(participant)) { | ||
VELOX_CHECK_GT(requestBytes_, 0); | ||
} | ||
|
||
ArbitrationOperation::~ArbitrationOperation() { | ||
VELOX_CHECK_NE( | ||
state_, | ||
State::kRunning, | ||
"Unexpected arbitration operation state on destruction"); | ||
VELOX_CHECK( | ||
allocatedBytes_ == 0 || allocatedBytes_ >= requestBytes_, | ||
"Unexpected allocatedBytes_ {} vs requestBytes_ {}", | ||
succinctBytes(allocatedBytes_), | ||
succinctBytes(requestBytes_)); | ||
} | ||
|
||
std::string ArbitrationOperation::stateName(State state) { | ||
switch (state) { | ||
case State::kInit: | ||
return "init"; | ||
case State::kWaiting: | ||
return "waiting"; | ||
case State::kRunning: | ||
return "running"; | ||
case State::kFinished: | ||
return "finished"; | ||
default: | ||
return fmt::format("unknown state: {}", static_cast<int>(state)); | ||
} | ||
} | ||
|
||
void ArbitrationOperation::setState(State state) { | ||
switch (state) { | ||
case State::kWaiting: | ||
VELOX_CHECK_EQ(state_, State::kInit); | ||
break; | ||
case State::kRunning: | ||
VELOX_CHECK(this->state_ == State::kWaiting || state_ == State::kInit); | ||
break; | ||
case State::kFinished: | ||
VELOX_CHECK_EQ(this->state_, State::kRunning); | ||
break; | ||
default: | ||
VELOX_UNREACHABLE( | ||
"Unexpected state transition from {} to {}", state_, state); | ||
break; | ||
} | ||
state_ = state; | ||
} | ||
|
||
void ArbitrationOperation::start() { | ||
VELOX_CHECK_EQ(state_, State::kInit); | ||
participant_->startArbitration(this); | ||
setState(ArbitrationOperation::State::kRunning); | ||
} | ||
|
||
void ArbitrationOperation::finish() { | ||
setState(State::kFinished); | ||
VELOX_CHECK_EQ(finishTimeMs_, 0); | ||
finishTimeMs_ = getCurrentTimeMs(); | ||
participant_->finishArbitration(this); | ||
} | ||
|
||
bool ArbitrationOperation::aborted() const { | ||
return participant_->aborted(); | ||
} | ||
|
||
size_t ArbitrationOperation::executionTimeMs() const { | ||
if (state_ == State::kFinished) { | ||
VELOX_CHECK_GE(finishTimeMs_, createTimeMs_); | ||
return finishTimeMs_ - createTimeMs_; | ||
} else { | ||
const auto currentTimeMs = getCurrentTimeMs(); | ||
VELOX_CHECK_GE(currentTimeMs, createTimeMs_); | ||
return currentTimeMs - createTimeMs_; | ||
} | ||
} | ||
|
||
bool ArbitrationOperation::hasTimeout() const { | ||
return state_ != State::kFinished && timeoutMs() <= 0; | ||
} | ||
|
||
size_t ArbitrationOperation::timeoutMs() const { | ||
if (state_ == State::kFinished) { | ||
return 0; | ||
} | ||
const auto execTimeMs = executionTimeMs(); | ||
if (execTimeMs >= timeoutMs_) { | ||
return 0; | ||
} | ||
return timeoutMs_ - execTimeMs; | ||
} | ||
|
||
void ArbitrationOperation::setGrowTargets() { | ||
// We shall only set grow targets once after start execution. | ||
VELOX_CHECK_EQ(state_, State::kRunning); | ||
VELOX_CHECK( | ||
maxGrowBytes_ == 0 && minGrowBytes_ == 0, | ||
"Arbitration operation grow targets have already been set: {}/{}", | ||
succinctBytes(maxGrowBytes_), | ||
succinctBytes(minGrowBytes_)); | ||
participant_->getGrowTargets(requestBytes_, maxGrowBytes_, minGrowBytes_); | ||
VELOX_CHECK_LE(requestBytes_, maxGrowBytes_); | ||
} | ||
|
||
std::ostream& operator<<(std::ostream& out, ArbitrationOperation::State state) { | ||
out << ArbitrationOperation::stateName(state); | ||
return out; | ||
} | ||
} // namespace facebook::velox::memory |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#pragma once | ||
|
||
#include "velox/common/base/Counters.h" | ||
#include "velox/common/base/GTestMacros.h" | ||
#include "velox/common/base/StatsReporter.h" | ||
#include "velox/common/future/VeloxPromise.h" | ||
#include "velox/common/memory/ArbitrationParticipant.h" | ||
#include "velox/common/memory/Memory.h" | ||
|
||
namespace facebook::velox::memory { | ||
|
||
/// Manages the execution of a memory arbitration request within the arbitrator. | ||
class ArbitrationOperation { | ||
public: | ||
ArbitrationOperation( | ||
ScopedArbitrationParticipant&& pool, | ||
uint64_t requestBytes, | ||
uint64_t timeoutMs); | ||
|
||
~ArbitrationOperation(); | ||
|
||
enum class State { | ||
kInit, | ||
kWaiting, | ||
kRunning, | ||
kFinished, | ||
}; | ||
|
||
State state() const { | ||
return state_; | ||
} | ||
|
||
static std::string stateName(State state); | ||
|
||
/// Returns the corresponding arbitration participant. | ||
const ScopedArbitrationParticipant& participant() { | ||
return participant_; | ||
} | ||
|
||
/// Invoked to start arbitration execution on the arbitration participant. The | ||
/// latter ensures the serialized execution of arbitration operations from the | ||
/// same query with one at a time. So this method blocks until all the prior | ||
/// arbitration operations finish. | ||
void start(); | ||
|
||
/// Invoked to finish arbitration execution on the arbitration participant. It | ||
/// also resumes the next waiting arbitration operation to execute if there is | ||
/// one. | ||
void finish(); | ||
|
||
/// Returns true if the corresponding arbitration participant has been | ||
/// aborted. | ||
bool aborted() const; | ||
|
||
/// Invoked to set the grow targets for this arbitration operation based on | ||
/// the request size. | ||
/// | ||
/// NOTE: this should be called once after the arbitration operation is | ||
/// started. | ||
void setGrowTargets(); | ||
|
||
uint64_t requestBytes() const { | ||
return requestBytes_; | ||
} | ||
|
||
/// Returns the max grow bytes for this arbitration operation which could be | ||
/// larger than the request bytes for exponential growth. | ||
uint64_t maxGrowBytes() const { | ||
return maxGrowBytes_; | ||
} | ||
|
||
/// Returns the min grow bytes for this arbitration operation to ensure the | ||
/// arbitration participant has the minimum amount of memory capacity. The | ||
/// arbitrator might allocate memory from the reserved memory capacity pool | ||
/// for the min grow bytes. | ||
uint64_t minGrowBytes() const { | ||
return minGrowBytes_; | ||
} | ||
|
||
/// Returns the allocated bytes by this arbitration operation. | ||
uint64_t& allocatedBytes() { | ||
return allocatedBytes_; | ||
} | ||
|
||
/// Returns the remaining execution time for this operation before time out. | ||
/// If the operation has already finished, this returns zero. | ||
size_t timeoutMs() const; | ||
|
||
/// Returns true if this operation has timed out. | ||
bool hasTimeout() const; | ||
|
||
/// Returns the execution time of this arbitration operation since creation. | ||
size_t executionTimeMs() const; | ||
|
||
/// Getters/Setters of the wait time in (local) arbitration paritcipant wait | ||
/// queue or (global) arbitrator request wait queue. | ||
void setLocalArbitrationWaitTimeUs(uint64_t waitTimeUs) { | ||
VELOX_CHECK_EQ(localArbitrationWaitTimeUs_, 0); | ||
VELOX_CHECK_EQ(state_, State::kWaiting); | ||
localArbitrationWaitTimeUs_ = waitTimeUs; | ||
} | ||
|
||
uint64_t localArbitrationWaitTimeUs() const { | ||
return localArbitrationWaitTimeUs_; | ||
} | ||
|
||
void setGlobalArbitrationWaitTimeUs(uint64_t waitTimeUs) { | ||
VELOX_CHECK_EQ(globalArbitrationWaitTimeUs_, 0); | ||
VELOX_CHECK_EQ(state_, State::kRunning); | ||
globalArbitrationWaitTimeUs_ = waitTimeUs; | ||
} | ||
|
||
uint64_t globalArbitrationWaitTimeUs() const { | ||
return globalArbitrationWaitTimeUs_; | ||
} | ||
|
||
private: | ||
void setState(State state); | ||
|
||
const uint64_t requestBytes_; | ||
const uint64_t timeoutMs_; | ||
|
||
// The start time of this arbitration operation. | ||
const uint64_t createTimeMs_; | ||
const ScopedArbitrationParticipant participant_; | ||
|
||
State state_{State::kInit}; | ||
|
||
uint64_t finishTimeMs_{0}; | ||
|
||
uint64_t maxGrowBytes_{0}; | ||
uint64_t minGrowBytes_{0}; | ||
|
||
// The actual bytes allocated from arbitrator based on the request bytes and | ||
// grow targets. It is either zero on failure or between 'requestBytes_' and | ||
// 'maxGrowBytes_' on success. | ||
uint64_t allocatedBytes_{0}; | ||
|
||
// The time that waits in local arbitration queue. | ||
uint64_t localArbitrationWaitTimeUs_{0}; | ||
|
||
// The time that waits for global arbitration queue. | ||
uint64_t globalArbitrationWaitTimeUs_{0}; | ||
|
||
friend class ArbitrationParticipant; | ||
}; | ||
|
||
std::ostream& operator<<(std::ostream& out, ArbitrationOperation::State state); | ||
} // namespace facebook::velox::memory | ||
|
||
template <> | ||
struct fmt::formatter<facebook::velox::memory::ArbitrationOperation::State> | ||
: formatter<std::string> { | ||
auto format( | ||
facebook::velox::memory::ArbitrationOperation::State state, | ||
format_context& ctx) { | ||
return formatter<std::string>::format( | ||
facebook::velox::memory::ArbitrationOperation::stateName(state), ctx); | ||
} | ||
}; |
Oops, something went wrong.