Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Nov 13, 2023
1 parent 0818e37 commit 119d735
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 92 deletions.
165 changes: 74 additions & 91 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,79 +186,13 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
velox::filesystems::registerLocalFileSystem();
initJolFilesystem(conf);

std::unordered_map<std::string, std::string> configurationValues;

#ifdef ENABLE_S3
std::string awsAccessKey = conf.at("spark.hadoop.fs.s3a.access.key");
std::string awsSecretKey = conf.at("spark.hadoop.fs.s3a.secret.key");
std::string awsEndpoint = conf.at("spark.hadoop.fs.s3a.endpoint");
std::string sslEnabled = conf.at("spark.hadoop.fs.s3a.connection.ssl.enabled");
std::string pathStyleAccess = conf.at("spark.hadoop.fs.s3a.path.style.access");
std::string useInstanceCredentials = conf.at("spark.hadoop.fs.s3a.use.instance.credentials");
std::string iamRole = conf.at("spark.hadoop.fs.s3a.iam.role");
std::string iamRoleSessionName = conf.at("spark.hadoop.fs.s3a.iam.role.session.name");

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
awsAccessKey = std::string(envAwsAccessKey);
}
const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
if (envAwsSecretKey != nullptr) {
awsSecretKey = std::string(envAwsSecretKey);
}
const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}

std::unordered_map<std::string, std::string> s3Config({});
if (useInstanceCredentials == "true") {
s3Config.insert({
{"hive.s3.use-instance-credentials", useInstanceCredentials},
});
} else if (!iamRole.empty()) {
s3Config.insert({
{"hive.s3.iam-role", iamRole},
});
if (!iamRoleSessionName.empty()) {
s3Config.insert({
{"hive.s3.iam-role-session-name", iamRoleSessionName},
});
}
} else {
s3Config.insert({
{"hive.s3.aws-access-key", awsAccessKey},
{"hive.s3.aws-secret-key", awsSecretKey},
});
}
// Only need to set s3 endpoint when not use instance credentials.
if (useInstanceCredentials != "true") {
s3Config.insert({
{"hive.s3.endpoint", awsEndpoint},
});
}
s3Config.insert({
{"hive.s3.ssl.enabled", sslEnabled},
{"hive.s3.path-style-access", pathStyleAccess},
});

configurationValues.merge(s3Config);
#endif

initCache(conf);
initConnectorFactory(conf);
initConnector(conf);

#ifdef GLUTEN_PRINT_DEBUG
printConf(conf);
#endif

auto properties = std::make_shared<const velox::core::MemConfig>(configurationValues);
auto hiveConnector =
velox::connector::getConnectorFactory(velox::connector::hive::HiveConnectorFactory::kHiveConnectorName)
->newConnector(kHiveConnectorId, properties, ioExecutor_.get());

registerConnector(hiveConnector);

// Register Velox functions
registerAllFunctions();
if (!facebook::velox::isRegisteredVectorSerde()) {
Expand Down Expand Up @@ -385,40 +319,89 @@ class MemoryPoolReplacedHiveConnector : public velox::connector::hive::HiveConne
private:
velox::memory::MemoryPool* pool_;
};

class MemoryPoolReplacedHiveConnectorFactory : public velox::connector::hive::HiveConnectorFactory {
public:
MemoryPoolReplacedHiveConnectorFactory(velox::memory::MemoryPool* pool) : pool_{pool} {}
std::shared_ptr<velox::connector::Connector> newConnector(
const std::string& id,
std::shared_ptr<const velox::Config> properties,
folly::Executor* executor) override {
return std::make_shared<MemoryPoolReplacedHiveConnector>(pool_, id, properties, executor);
}

private:
velox::memory::MemoryPool* pool_;
};
} // namespace

void VeloxBackend::initConnectorFactory(const std::unordered_map<std::string, std::string>& conf) {
void VeloxBackend::initConnector(const std::unordered_map<std::string, std::string>& conf) {
int32_t ioThreads = std::stoi(getConfigValue(conf, kVeloxIOThreads, kVeloxIOThreadsDefault));
int32_t splitPreloadPerDriver =
std::stoi(getConfigValue(conf, kVeloxSplitPreloadPerDriver, kVeloxSplitPreloadPerDriverDefault));

if (splitPreloadPerDriver > 0 && ioThreads > 0) {
LOG(INFO) << "STARTUP: Using split preloading, Split preload per driver: " << splitPreloadPerDriver
<< ", IO threads: " << ioThreads;
}

std::unordered_map<std::string, std::string> configurationValues;

#ifdef ENABLE_S3
std::string awsAccessKey = conf.at("spark.hadoop.fs.s3a.access.key");
std::string awsSecretKey = conf.at("spark.hadoop.fs.s3a.secret.key");
std::string awsEndpoint = conf.at("spark.hadoop.fs.s3a.endpoint");
std::string sslEnabled = conf.at("spark.hadoop.fs.s3a.connection.ssl.enabled");
std::string pathStyleAccess = conf.at("spark.hadoop.fs.s3a.path.style.access");
std::string useInstanceCredentials = conf.at("spark.hadoop.fs.s3a.use.instance.credentials");
std::string iamRole = conf.at("spark.hadoop.fs.s3a.iam.role");
std::string iamRoleSessionName = conf.at("spark.hadoop.fs.s3a.iam.role.session.name");

const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID");
if (envAwsAccessKey != nullptr) {
awsAccessKey = std::string(envAwsAccessKey);
}
const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY");
if (envAwsSecretKey != nullptr) {
awsSecretKey = std::string(envAwsSecretKey);
}
const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT");
if (envAwsEndpoint != nullptr) {
awsEndpoint = std::string(envAwsEndpoint);
}

std::unordered_map<std::string, std::string> s3Config({});
if (useInstanceCredentials == "true") {
s3Config.insert({
{"hive.s3.use-instance-credentials", useInstanceCredentials},
});
} else if (!iamRole.empty()) {
s3Config.insert({
{"hive.s3.iam-role", iamRole},
});
if (!iamRoleSessionName.empty()) {
s3Config.insert({
{"hive.s3.iam-role-session-name", iamRoleSessionName},
});
}
} else {
s3Config.insert({
{"hive.s3.aws-access-key", awsAccessKey},
{"hive.s3.aws-secret-key", awsSecretKey},
});
}
// Only need to set s3 endpoint when not use instance credentials.
if (useInstanceCredentials != "true") {
s3Config.insert({
{"hive.s3.endpoint", awsEndpoint},
});
}
s3Config.insert({
{"hive.s3.ssl.enabled", sslEnabled},
{"hive.s3.path-style-access", pathStyleAccess},
});

configurationValues.merge(s3Config);
#endif

auto properties = std::make_shared<const velox::core::MemConfig>(configurationValues);

if (ioThreads > 0) {
ioExecutor_ = std::make_unique<folly::IOThreadPoolExecutor>(ioThreads);
FLAGS_split_preload_per_driver = splitPreloadPerDriver;
// Use global memory pool for hive connector if SplitPreloadPerDriver was enabled. Otherwise the allocated memory
// Use global memory pool for hive connector if SplitPreloadPerDriver was enabled. Otherwise, the allocated memory
// blocks might cause unexpected behavior (e.g. crash) since the allocations were proceed in background IO threads.
velox::connector::registerConnectorFactory(
std::make_shared<MemoryPoolReplacedHiveConnectorFactory>(gluten::defaultLeafVeloxMemoryPool().get()));
velox::connector::registerConnector(std::make_shared<MemoryPoolReplacedHiveConnector>(
gluten::defaultLeafVeloxMemoryPool().get(), kHiveConnectorId, properties, ioExecutor_.get()));
} else {
velox::connector::registerConnectorFactory(std::make_shared<velox::connector::hive::HiveConnectorFactory>());
}

if (splitPreloadPerDriver > 0 && ioThreads > 0) {
LOG(INFO) << "STARTUP: Using split preloading, Split preload per driver: " << splitPreloadPerDriver
<< ", IO threads: " << ioThreads;
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, properties, ioExecutor_.get()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class VeloxBackend {

void init(const std::unordered_map<std::string, std::string>& conf);
void initCache(const std::unordered_map<std::string, std::string>& conf);
void initConnectorFactory(const std::unordered_map<std::string, std::string>& conf);
void initConnector(const std::unordered_map<std::string, std::string>& conf);
void initUdf(const std::unordered_map<std::string, std::string>& conf);

void initJolFilesystem(const std::unordered_map<std::string, std::string>& conf);
Expand Down

0 comments on commit 119d735

Please sign in to comment.