Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7800][VL] Add config for max reclaim wait time to avoid dead lock when memory arbitration #7799

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ const bool kMemoryUseHugePagesDefault = false;
const std::string kVeloxMemInitCapacity = "spark.gluten.sql.columnar.backend.velox.memInitCapacity";
const uint64_t kVeloxMemInitCapacityDefault = 8 << 20;

const std::string kVeloxMemReclaimMaxWaitMs = "spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs";
const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min

const std::string kHiveConnectorId = "test-hive";
const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled";

Expand Down
1 change: 0 additions & 1 deletion cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapp
jlong dsHandle,
jlong batchHandle) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);
auto datasource = ObjectStore::retrieve<VeloxDataSource>(dsHandle);
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
datasource->write(batch);
Expand Down
42 changes: 23 additions & 19 deletions cpp/velox/memory/VeloxMemoryManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "velox/common/memory/MallocAllocator.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/exec/MemoryReclaimer.h"
Comment on lines +25 to 26
Copy link
Member

@zhztheplayer zhztheplayer Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to include SharedArbitrator.h? Thought we should avoid this kind of dependency. One of the reason is, code of the shared arbitrator is usually updated frequently so we may easily find our code doesn't work after a Velox rebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want reuse velox::memory::SharedArbitrator::ExtraConfig, would you give some input?


#include "compute/VeloxBackend.h"
Expand All @@ -36,12 +37,6 @@ namespace gluten {
using namespace facebook;

namespace {

static constexpr std::string_view kMemoryPoolInitialCapacity{"memory-pool-initial-capacity"};
static constexpr uint64_t kDefaultMemoryPoolInitialCapacity{256 << 20};
static constexpr std::string_view kMemoryPoolTransferCapacity{"memory-pool-transfer-capacity"};
static constexpr uint64_t kDefaultMemoryPoolTransferCapacity{128 << 20};

template <typename T>
T getConfig(
const std::unordered_map<std::string, std::string>& configs,
Expand All @@ -57,24 +52,28 @@ T getConfig(
return defaultValue;
}
} // namespace

/// We assume in a single Spark task. No thread-safety should be guaranteed.
class ListenableArbitrator : public velox::memory::MemoryArbitrator {
public:
ListenableArbitrator(const Config& config, AllocationListener* listener)
: MemoryArbitrator(config),
listener_(listener),
memoryPoolInitialCapacity_(velox::config::toCapacity(
getConfig<std::string>(
config.extraConfigs,
kMemoryPoolInitialCapacity,
std::to_string(kDefaultMemoryPoolInitialCapacity)),
velox::config::CapacityUnit::BYTE)),
reclaimMaxWaitMs_(
velox::memory::SharedArbitrator::ExtraConfig::memoryReclaimMaxWaitTimeMs(config.extraConfigs)),
memoryPoolInitialCapacity_(
velox::memory::SharedArbitrator::ExtraConfig::memoryPoolInitialCapacity(config.extraConfigs)),
memoryPoolTransferCapacity_(velox::config::toCapacity(
getConfig<std::string>(
config.extraConfigs,
kMemoryPoolTransferCapacity,
std::to_string(kDefaultMemoryPoolTransferCapacity)),
velox::config::CapacityUnit::BYTE)) {}
kMemoryReservationBlockSize,
std::to_string(kMemoryReservationBlockSizeDefault)),
velox::config::CapacityUnit::BYTE)) {
if (reclaimMaxWaitMs_ == 0) {
LOG(WARNING) << kVeloxMemReclaimMaxWaitMs
<< " was set to 0, it may cause dead lock when memory arbitration has bug.";
}
}
std::string kind() const override {
return kind_;
}
Expand Down Expand Up @@ -121,7 +120,7 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator {
VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool");
pool = candidates_.begin()->first;
}
pool->reclaim(targetBytes, 0, status); // ignore the output
pool->reclaim(targetBytes, reclaimMaxWaitMs_, status); // ignore the output
return shrinkCapacity0(pool, 0);
}

Expand Down Expand Up @@ -168,6 +167,7 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator {
}

gluten::AllocationListener* listener_;
const uint64_t reclaimMaxWaitMs_;
const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused.
const uint64_t memoryPoolTransferCapacity_;

Expand Down Expand Up @@ -208,14 +208,18 @@ VeloxMemoryManager::VeloxMemoryManager(const std::string& kind, std::unique_ptr<
kMemoryReservationBlockSize, kMemoryReservationBlockSizeDefault);
auto memInitCapacity =
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemInitCapacity, kVeloxMemInitCapacityDefault);
auto memReclaimMaxWaitMs =
VeloxBackend::get()->getBackendConf()->get<uint64_t>(kVeloxMemReclaimMaxWaitMs, kVeloxMemReclaimMaxWaitMsDefault);
blockListener_ = std::make_unique<BlockAllocationListener>(listener_.get(), reservationBlockSize);
listenableAlloc_ = std::make_unique<ListenableMemoryAllocator>(defaultMemoryAllocator().get(), blockListener_.get());
arrowPool_ = std::make_unique<ArrowMemoryPool>(listenableAlloc_.get());

std::unordered_map<std::string, std::string> extraArbitratorConfigs;
extraArbitratorConfigs["memory-pool-initial-capacity"] = folly::to<std::string>(memInitCapacity) + "B";
extraArbitratorConfigs["memory-pool-transfer-capacity"] = folly::to<std::string>(reservationBlockSize) + "B";
extraArbitratorConfigs["memory-reclaim-max-wait-time"] = folly::to<std::string>(0) + "ms";
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryPoolInitialCapacity)] =
folly::to<std::string>(memInitCapacity) + "B";
extraArbitratorConfigs[kMemoryReservationBlockSize] = folly::to<std::string>(reservationBlockSize) + "B";
extraArbitratorConfigs[std::string(velox::memory::SharedArbitrator::ExtraConfig::kMemoryReclaimMaxWaitTime)] =
folly::to<std::string>(memReclaimMaxWaitMs) + "ms";

ArbitratorFactoryRegister afr(listener_.get());
velox::memory::MemoryManagerOptions mmOptions{
Expand Down
1 change: 0 additions & 1 deletion cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ arrow::Status VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb
}

arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
int64_t rawSize = batch_->size();
bufferOutputStream_->seekp(0);
batch_->flush(bufferOutputStream_.get());
auto buffer = bufferOutputStream_->getBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,13 @@ object GlutenConfig {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8MB")

val COLUMNAR_VELOX_MEM_RECLAIM_MAX_WAIT_MS =
buildConf("spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs")
.internal()
.doc("The max time in ms to wait for memory reclaim.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(60))

val COLUMNAR_VELOX_SSD_CACHE_PATH =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.ssdCachePath")
.internal()
Expand Down
Loading