Skip to content

Commit

Permalink
[VL] Respect spark.gluten.sql.debug in native side (#3748)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohahaha authored Nov 21, 2023
1 parent b22c862 commit 018da4c
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 59 deletions.
19 changes: 1 addition & 18 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,9 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
Runtime(const std::unordered_map<std::string, std::string>& confMap) : confMap_(confMap) {}
virtual ~Runtime() = default;

void parsePlan(const uint8_t* data, int32_t size) {
parsePlan(data, size, {-1, -1, -1});
}

/// Parse and cache the plan.
/// Return true if parsed successfully.
void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) {
taskInfo_ = taskInfo;
#ifdef GLUTEN_PRINT_DEBUG
try {
auto jsonPlan = substraitFromPbToJson("Plan", data, size);
DEBUG_OUT << std::string(50, '#') << " received substrait::Plan:" << std::endl;
DEBUG_OUT << "Task stageId: " << taskInfo_.stageId << ", partitionId: " << taskInfo_.partitionId
<< ", taskId: " << taskInfo_.taskId << "; " << jsonPlan << std::endl;
} catch (const std::exception& e) {
std::cerr << "Error converting Substrait plan to JSON: " << e.what() << std::endl;
}
#endif
GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed");
}
virtual void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) = 0;

virtual std::string planString(bool details, const std::unordered_map<std::string, std::string>& sessionConf) = 0;

Expand Down
9 changes: 9 additions & 0 deletions cpp/core/config/GlutenConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,13 @@ std::unordered_map<std::string, std::string> parseConfMap(JNIEnv* env, jbyteArra

return sparkConfs;
}

std::string printConfig(const std::unordered_map<std::string, std::string>& conf) {
std::ostringstream oss;
oss << std::endl;
for (auto& [k, v] : conf) {
oss << " [" << k << ", " << v << "]\n";
}
return oss.str();
}
} // namespace gluten
3 changes: 3 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
namespace gluten {

// store configurations that are general to all backend types
const std::string kDebugModeEnabled = "spark.gluten.sql.debug";

const std::string kGlutenSaveDir = "spark.gluten.saveDir";

Expand Down Expand Up @@ -56,4 +57,6 @@ const std::string kQatBackendName = "qat";
const std::string kIaaBackendName = "iaa";

std::unordered_map<std::string, std::string> parseConfMap(JNIEnv* env, jbyteArray configArray);

std::string printConfig(const std::unordered_map<std::string, std::string>& conf);
} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ JNIEXPORT jstring JNICALL Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapp
auto planData = reinterpret_cast<const uint8_t*>(env->GetByteArrayElements(planArray, 0));
auto planSize = env->GetArrayLength(planArray);
auto ctx = gluten::getRuntime(env, wrapper);
ctx->parsePlan(planData, planSize);
ctx->parsePlan(planData, planSize, {});
auto& conf = ctx->getConfMap();
auto planString = ctx->planString(details, conf);
return env->NewStringUTF(planString.c_str());
Expand Down
5 changes: 3 additions & 2 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ auto BM_Generic = [](::benchmark::State& state,
setCpu(state.thread_index());
}
auto memoryManager = getDefaultMemoryManager();
auto runtime = Runtime::create(kVeloxRuntimeKind);
auto runtime = Runtime::create(kVeloxRuntimeKind, conf);
const auto& filePath = getExampleFilePath(substraitJsonFile);
auto plan = getPlanFromFile(filePath);
auto startTime = std::chrono::steady_clock::now();
Expand All @@ -146,7 +146,7 @@ auto BM_Generic = [](::benchmark::State& state,
});
}

runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size());
runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(), {});
auto resultIter =
runtime->createResultIterator(memoryManager.get(), "/tmp/test-spill", std::move(inputIters), conf);
auto veloxPlan = dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan();
Expand Down Expand Up @@ -233,6 +233,7 @@ int main(int argc, char** argv) {
std::unordered_map<std::string, std::string> conf;

conf.insert({gluten::kSparkBatchSize, FLAGS_batch_size});
conf.insert({kDebugModeEnabled, "true"});
initVeloxBackend(conf);

try {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/benchmarks/QueryBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ auto BM = [](::benchmark::State& state,
state.PauseTiming();
state.ResumeTiming();

runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size());
runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(), {});
std::shared_ptr<const facebook::velox::core::PlanNode> veloxPlan;
auto resultIter = getResultIterator(memoryManager.get(), runtime, scanInfos, veloxPlan);
auto outputSchema = toArrowSchema(veloxPlan->outputType(), defaultLeafVeloxMemoryPool().get());
Expand Down
19 changes: 5 additions & 14 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#ifdef ENABLE_GCS
#include <fstream>
#endif
#include "config/GlutenConfig.h"
#include "jni/JniFileSystem.h"
#include "udf/UdfLoader.h"
#include "utils/ConfigExtractor.h"
Expand Down Expand Up @@ -115,16 +116,6 @@ const bool kVeloxFileHandleCacheEnabledDefault = false;

namespace gluten {

void VeloxBackend::printConf(const facebook::velox::Config& conf) {
std::ostringstream oss;
oss << "STARTUP: VeloxBackend conf = {\n";
for (auto& [k, v] : conf.valuesCopy()) {
oss << " {" << k << ", " << v << "}\n";
}
oss << "}\n";
LOG(INFO) << oss.str();
}

void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf) {
// Init glog and log level.
auto veloxmemcfg = std::make_shared<facebook::velox::core::MemConfigMutable>(conf);
Expand Down Expand Up @@ -257,10 +248,6 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
initCache(veloxcfg);
initIOExecutor(veloxcfg);

#ifdef GLUTEN_PRINT_DEBUG
printConf(*veloxcfg);
#endif

veloxmemcfg->setValue(
velox::connector::hive::HiveConfig::kEnableFileHandleCache,
veloxcfg->get<bool>(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false");
Expand All @@ -279,6 +266,10 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
velox::exec::Operator::registerOperator(std::make_unique<RowVectorStreamOperatorTranslator>());

initUdf(veloxcfg);

if (veloxcfg->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(veloxcfg->valuesCopy());
}
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
Expand Down
2 changes: 0 additions & 2 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ class VeloxBackend {

void initJolFilesystem(const facebook::velox::Config* conf);

void printConf(const facebook::velox::Config& conf);

std::string getCacheFilePrefix() {
return "cache." + boost::lexical_cast<std::string>(boost::uuids::random_generator()()) + ".";
}
Expand Down
38 changes: 20 additions & 18 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,39 @@
*/

#include "VeloxRuntime.h"

#include <filesystem>

#include "arrow/c/bridge.h"
#include "VeloxBackend.h"
#include "compute/ResultIterator.h"
#include "compute/Runtime.h"
#include "compute/VeloxPlanConverter.h"
#include "config/GlutenConfig.h"
#include "operators/serializer/VeloxRowToColumnarConverter.h"
#include "shuffle/VeloxShuffleReader.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "utils/ConfigExtractor.h"

using namespace facebook;

namespace gluten {

namespace {
VeloxRuntime::VeloxRuntime(const std::unordered_map<std::string, std::string>& confMap) : Runtime(confMap) {}

#ifdef GLUTEN_PRINT_DEBUG
void printSessionConf(const std::unordered_map<std::string, std::string>& conf) {
std::ostringstream oss;
oss << "session conf = {\n";
for (auto& [k, v] : conf) {
oss << " {" << k << " = " << v << "}\n";
void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) {
taskInfo_ = taskInfo;
if (getConfigValue(confMap_, kDebugModeEnabled, "false") == "true") {
try {
auto jsonPlan = substraitFromPbToJson("Plan", data, size);
LOG(INFO) << std::string(50, '#') << " received substrait::Plan:";
LOG(INFO) << taskInfo_ << std::endl << jsonPlan;
} catch (const std::exception& e) {
LOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what();
}
}
oss << "}\n";
LOG(INFO) << oss.str();
}
#endif

} // namespace

VeloxRuntime::VeloxRuntime(const std::unordered_map<std::string, std::string>& confMap) : Runtime(confMap) {}
GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed");
}

void VeloxRuntime::getInfoAndIds(
const std::unordered_map<velox::core::PlanNodeId, std::shared_ptr<SplitInfo>>& splitInfoMap,
Expand Down Expand Up @@ -82,9 +84,9 @@ std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
const std::string& spillDir,
const std::vector<std::shared_ptr<ResultIterator>>& inputs,
const std::unordered_map<std::string, std::string>& sessionConf) {
#ifdef GLUTEN_PRINT_DEBUG
printSessionConf(sessionConf);
#endif
if (getConfigValue(confMap_, kDebugModeEnabled, "false") == "true") {
LOG(INFO) << "VeloxRuntime session config:" << printConfig(confMap_);
}

VeloxPlanConverter veloxPlanConverter(inputs, getLeafVeloxPool(memoryManager).get(), sessionConf);
veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_);
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include "operators/writer/VeloxParquetDatasource.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "shuffle/VeloxShuffleReader.h"
#include "utils/ResourceMap.h"

namespace gluten {

Expand All @@ -37,6 +35,8 @@ class VeloxRuntime final : public Runtime {
public:
explicit VeloxRuntime(const std::unordered_map<std::string, std::string>& confMap);

void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) override;

static std::shared_ptr<facebook::velox::memory::MemoryPool> getAggregateVeloxPool(MemoryManager* memoryManager) {
return toVeloxMemoryManager(memoryManager)->getAggregateMemoryPool();
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/tests/RuntimeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class DummyRuntime final : public Runtime {
public:
DummyRuntime(const std::unordered_map<std::string, std::string>& conf) : Runtime(conf) {}

void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) override {}

std::shared_ptr<ResultIterator> createResultIterator(
MemoryManager* memoryManager,
const std::string& spillDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ object GlutenConfig {
// Pass through to native conf
val GLUTEN_SAVE_DIR = "spark.gluten.saveDir"

val GLUTEN_DEBUG_MODE = "spark.gluten.sql.debug"

// Added back to Spark Conf during executor initialization
val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes"
val GLUTEN_CONSERVATIVE_OFFHEAP_SIZE_IN_BYTES_KEY =
Expand Down Expand Up @@ -426,6 +428,7 @@ object GlutenConfig {
conf: scala.collection.Map[String, String]): util.Map[String, String] = {
val nativeConfMap = new util.HashMap[String, String]()
val keys = ImmutableList.of(
GLUTEN_DEBUG_MODE,
GLUTEN_SAVE_DIR,
GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_MAX_BATCH_SIZE_KEY,
Expand Down Expand Up @@ -507,6 +510,7 @@ object GlutenConfig {
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

val keys = ImmutableList.of(
GLUTEN_DEBUG_MODE,
// datasource config
SPARK_SQL_PARQUET_COMPRESSION_CODEC,
// datasource config end
Expand Down Expand Up @@ -1162,7 +1166,7 @@ object GlutenConfig {
.createWithDefault("DEBUG")

val DEBUG_LEVEL_ENABLED =
buildConf("spark.gluten.sql.debug")
buildConf(GLUTEN_DEBUG_MODE)
.internal()
.booleanConf
.createWithDefault(false)
Expand Down

0 comments on commit 018da4c

Please sign in to comment.