Skip to content

Commit

Permalink
Move reclaimer guard to velox/common
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Mar 6, 2024
1 parent 6d9d026 commit c748106
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 64 deletions.
40 changes: 40 additions & 0 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <vector>

#include "velox/common/base/Exceptions.h"
#include "velox/common/base/Portability.h"
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/future/VeloxPromise.h"
#include "velox/common/time/Timer.h"
Expand Down Expand Up @@ -356,6 +357,45 @@ class MemoryReclaimer {
MemoryReclaimer() = default;
};

/// The object is used to set/clear non-reclaimable section of an operation in
/// the middle of its execution. It allows the memory arbitrator to reclaim
/// memory from a running operator which is waiting for memory arbitration.
/// 'nonReclaimableSection' points to the corresponding flag of the associated
/// operator.
class ReclaimableSectionGuard {
public:
explicit ReclaimableSectionGuard(tsan_atomic<bool>* nonReclaimableSection)
: nonReclaimableSection_(nonReclaimableSection),
oldNonReclaimableSectionValue_(*nonReclaimableSection_) {
*nonReclaimableSection_ = false;
}

~ReclaimableSectionGuard() {
*nonReclaimableSection_ = oldNonReclaimableSectionValue_;
}

private:
tsan_atomic<bool>* const nonReclaimableSection_;
const bool oldNonReclaimableSectionValue_;
};

class NonReclaimableSectionGuard {
public:
explicit NonReclaimableSectionGuard(tsan_atomic<bool>* nonReclaimableSection)
: nonReclaimableSection_(nonReclaimableSection),
oldNonReclaimableSectionValue_(*nonReclaimableSection_) {
*nonReclaimableSection_ = true;
}

~NonReclaimableSectionGuard() {
*nonReclaimableSection_ = oldNonReclaimableSectionValue_;
}

private:
tsan_atomic<bool>* const nonReclaimableSection_;
const bool oldNonReclaimableSectionValue_;
};

/// The memory arbitration context which is set on per-thread local variable by
/// memory arbitrator. It is used to indicate a running thread is under memory
/// arbitration processing or not. This helps to enable sanity check such as all
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ std::shared_ptr<memory::MemoryPool> createSortPool(
return writerPool->addLeafChild(fmt::format("{}.sort", writerPool->name()));
}

#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \
exec::NonReclaimableSectionGuard nonReclaimableGuard( \
#define WRITER_NON_RECLAIMABLE_SECTION_GUARD(index) \
memory::NonReclaimableSectionGuard nonReclaimableGuard( \
writerInfo_[(index)]->nonReclaimableSectionHolder.get())

} // namespace
Expand Down
10 changes: 5 additions & 5 deletions velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnWrite) {
// Expect a throw if we don't set the non-reclaimable section.
VELOX_ASSERT_THROW(writer->write(vectors[0]), "");
{
exec::NonReclaimableSectionGuard nonReclaimableGuard(
memory::NonReclaimableSectionGuard nonReclaimableGuard(
&nonReclaimableSection);
for (size_t i = 0; i < vectors.size(); ++i) {
writer->write(vectors[i]);
Expand Down Expand Up @@ -1753,7 +1753,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimOnFlush) {
}));

{
exec::NonReclaimableSectionGuard nonReclaimableGuard(
memory::NonReclaimableSectionGuard nonReclaimableGuard(
&nonReclaimableSection);
for (size_t i = 0; i < vectors.size(); ++i) {
writer->write(vectors[i]);
Expand Down Expand Up @@ -1833,7 +1833,7 @@ TEST_F(E2EWriterTest, memoryReclaimAfterClose) {
writer->flush();

{
exec::NonReclaimableSectionGuard nonReclaimableGuard(
memory::NonReclaimableSectionGuard nonReclaimableGuard(
&nonReclaimableSection);
for (size_t i = 0; i < vectors.size(); ++i) {
writer->write(vectors[i]);
Expand Down Expand Up @@ -1926,7 +1926,7 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimDuringInit) {

std::unique_ptr<dwrf::Writer> writer;
{
exec::NonReclaimableSectionGuard nonReclaimableGuard(
memory::NonReclaimableSectionGuard nonReclaimableGuard(
&nonReclaimableSection);
std::thread writerThread([&]() {
writer =
Expand Down Expand Up @@ -1989,7 +1989,7 @@ TEST_F(E2EWriterTest, memoryReclaimThreshold) {
std::make_unique<dwrf::Writer>(std::move(sink), options, dwrfPool);

{
exec::NonReclaimableSectionGuard nonReclaimableGuard(
memory::NonReclaimableSectionGuard nonReclaimableGuard(
&nonReclaimableSection);
for (size_t i = 0; i < vectors.size(); ++i) {
writer->write(vectors[i]);
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void Writer::ensureWriteFits(size_t appendBytes, size_t appendRows) {

// Allows the memory arbitrator to reclaim memory from this file writer if the
// memory reservation below has triggered memory arbitration.
exec::ReclaimableSectionGuard reclaimGuard(nonReclaimableSection_);
memory::ReclaimableSectionGuard reclaimGuard(nonReclaimableSection_);

const size_t estimatedAppendMemoryBytes =
std::max(appendBytes, context.estimateNextWriteSize(appendRows));
Expand Down Expand Up @@ -254,7 +254,7 @@ void Writer::ensureStripeFlushFits() {

// Allows the memory arbitrator to reclaim memory from this file writer if the
// memory reservation below has triggered memory arbitration.
exec::ReclaimableSectionGuard reclaimGuard(nonReclaimableSection_);
memory::ReclaimableSectionGuard reclaimGuard(nonReclaimableSection_);

auto& context = getContext();
const size_t estimateFlushMemoryBytes =
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ void GroupingSet::ensureInputFits(const RowVectorPtr& input) {
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);
{
ReclaimableSectionGuard guard(nonReclaimableSection_);
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_.maybeReserve(targetIncrementBytes)) {
return;
}
Expand Down Expand Up @@ -901,7 +901,7 @@ void GroupingSet::ensureOutputFits() {
const uint64_t outputBufferSizeToReserve =
queryConfig_.preferredOutputBatchBytes() * 1.2;
{
ReclaimableSectionGuard guard(nonReclaimableSection_);
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_.maybeReserve(outputBufferSizeToReserve)) {
return;
}
Expand Down
41 changes: 0 additions & 41 deletions velox/exec/MemoryReclaimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#pragma once

#include "velox/common/base/Exceptions.h"
#include "velox/common/base/Portability.h"
#include "velox/common/memory/MemoryArbitrator.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -47,43 +45,4 @@ class MemoryReclaimer : public memory::MemoryReclaimer {
/// drivers to go off thread. A suspended driver thread is not counted as
/// running.
void memoryArbitrationStateCheck(memory::MemoryPool& pool);

/// The object is used to set/clear non-reclaimable section of an operation in
/// the middle of its execution. It allows the memory arbitrator to reclaim
/// memory from a running operator which is waiting for memory arbitration.
/// 'nonReclaimableSection' points to the corresponding flag of the associated
/// operator.
class ReclaimableSectionGuard {
public:
explicit ReclaimableSectionGuard(tsan_atomic<bool>* nonReclaimableSection)
: nonReclaimableSection_(nonReclaimableSection),
oldNonReclaimableSectionValue_(*nonReclaimableSection_) {
*nonReclaimableSection_ = false;
}

~ReclaimableSectionGuard() {
*nonReclaimableSection_ = oldNonReclaimableSectionValue_;
}

private:
tsan_atomic<bool>* const nonReclaimableSection_;
const bool oldNonReclaimableSectionValue_;
};

class NonReclaimableSectionGuard {
public:
explicit NonReclaimableSectionGuard(tsan_atomic<bool>* nonReclaimableSection)
: nonReclaimableSection_(nonReclaimableSection),
oldNonReclaimableSectionValue_(*nonReclaimableSection_) {
*nonReclaimableSection_ = true;
}

~NonReclaimableSectionGuard() {
*nonReclaimableSection_ = oldNonReclaimableSectionValue_;
}

private:
tsan_atomic<bool>* const nonReclaimableSection_;
const bool oldNonReclaimableSectionValue_;
};
} // namespace facebook::velox::exec
2 changes: 1 addition & 1 deletion velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ void SortBuffer::ensureInputFits(const VectorPtr& input) {
estimatedIncrementalBytes * 2,
currentMemoryUsage * spillConfig_->spillableReservationGrowthPct / 100);
{
exec::ReclaimableSectionGuard guard(nonReclaimableSection_);
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_->maybeReserve(targetIncrementBytes)) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void SortWindowBuild::ensureInputFits(const RowVectorPtr& input) {
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);
{
ReclaimableSectionGuard guard(nonReclaimableSection_);
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (data_->pool()->maybeReserve(targetIncrementBytes)) {
return;
}
Expand Down
20 changes: 10 additions & 10 deletions velox/exec/tests/MemoryReclaimerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "velox/exec/MemoryReclaimer.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/exec/tests/utils/OperatorTestBase.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"
Expand Down Expand Up @@ -116,16 +116,16 @@ TEST_F(MemoryReclaimerTest, abortTest) {
TEST(ReclaimableSectionGuard, basic) {
tsan_atomic<bool> nonReclaimableSection{false};
{
NonReclaimableSectionGuard guard(&nonReclaimableSection);
memory::NonReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_TRUE(nonReclaimableSection);
{
ReclaimableSectionGuard guard(&nonReclaimableSection);
memory::ReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_FALSE(nonReclaimableSection);
{
ReclaimableSectionGuard guard(&nonReclaimableSection);
memory::ReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_FALSE(nonReclaimableSection);
{
NonReclaimableSectionGuard guard(&nonReclaimableSection);
memory::NonReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_TRUE(nonReclaimableSection);
}
ASSERT_FALSE(nonReclaimableSection);
Expand All @@ -137,21 +137,21 @@ TEST(ReclaimableSectionGuard, basic) {
ASSERT_FALSE(nonReclaimableSection);
nonReclaimableSection = true;
{
ReclaimableSectionGuard guard(&nonReclaimableSection);
memory::ReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_FALSE(nonReclaimableSection);
{
NonReclaimableSectionGuard guard(&nonReclaimableSection);
memory::NonReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_TRUE(nonReclaimableSection);
{
ReclaimableSectionGuard guard(&nonReclaimableSection);
memory::ReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_FALSE(nonReclaimableSection);
{
ReclaimableSectionGuard guard(&nonReclaimableSection);
memory::ReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_FALSE(nonReclaimableSection);
}
ASSERT_FALSE(nonReclaimableSection);
{
NonReclaimableSectionGuard guard(&nonReclaimableSection);
memory::NonReclaimableSectionGuard guard(&nonReclaimableSection);
ASSERT_TRUE(nonReclaimableSection);
}
ASSERT_FALSE(nonReclaimableSection);
Expand Down

0 comments on commit c748106

Please sign in to comment.