diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index f03af3bae384..8e74bc941c33 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -186,79 +186,13 @@ void VeloxBackend::init(const std::unordered_map& conf velox::filesystems::registerLocalFileSystem(); initJolFilesystem(conf); - std::unordered_map configurationValues; - -#ifdef ENABLE_S3 - std::string awsAccessKey = conf.at("spark.hadoop.fs.s3a.access.key"); - std::string awsSecretKey = conf.at("spark.hadoop.fs.s3a.secret.key"); - std::string awsEndpoint = conf.at("spark.hadoop.fs.s3a.endpoint"); - std::string sslEnabled = conf.at("spark.hadoop.fs.s3a.connection.ssl.enabled"); - std::string pathStyleAccess = conf.at("spark.hadoop.fs.s3a.path.style.access"); - std::string useInstanceCredentials = conf.at("spark.hadoop.fs.s3a.use.instance.credentials"); - std::string iamRole = conf.at("spark.hadoop.fs.s3a.iam.role"); - std::string iamRoleSessionName = conf.at("spark.hadoop.fs.s3a.iam.role.session.name"); - - const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID"); - if (envAwsAccessKey != nullptr) { - awsAccessKey = std::string(envAwsAccessKey); - } - const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY"); - if (envAwsSecretKey != nullptr) { - awsSecretKey = std::string(envAwsSecretKey); - } - const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT"); - if (envAwsEndpoint != nullptr) { - awsEndpoint = std::string(envAwsEndpoint); - } - - std::unordered_map s3Config({}); - if (useInstanceCredentials == "true") { - s3Config.insert({ - {"hive.s3.use-instance-credentials", useInstanceCredentials}, - }); - } else if (!iamRole.empty()) { - s3Config.insert({ - {"hive.s3.iam-role", iamRole}, - }); - if (!iamRoleSessionName.empty()) { - s3Config.insert({ - {"hive.s3.iam-role-session-name", iamRoleSessionName}, - }); - } - } else { - s3Config.insert({ - {"hive.s3.aws-access-key", awsAccessKey}, - {"hive.s3.aws-secret-key", awsSecretKey}, - }); - } - // Only need to set s3 endpoint when not use instance credentials. - if (useInstanceCredentials != "true") { - s3Config.insert({ - {"hive.s3.endpoint", awsEndpoint}, - }); - } - s3Config.insert({ - {"hive.s3.ssl.enabled", sslEnabled}, - {"hive.s3.path-style-access", pathStyleAccess}, - }); - - configurationValues.merge(s3Config); -#endif - initCache(conf); - initConnectorFactory(conf); + initConnector(conf); #ifdef GLUTEN_PRINT_DEBUG printConf(conf); #endif - auto properties = std::make_shared(configurationValues); - auto hiveConnector = - velox::connector::getConnectorFactory(velox::connector::hive::HiveConnectorFactory::kHiveConnectorName) - ->newConnector(kHiveConnectorId, properties, ioExecutor_.get()); - - registerConnector(hiveConnector); - // Register Velox functions registerAllFunctions(); if (!facebook::velox::isRegisteredVectorSerde()) { @@ -385,40 +319,89 @@ class MemoryPoolReplacedHiveConnector : public velox::connector::hive::HiveConne private: velox::memory::MemoryPool* pool_; }; - -class MemoryPoolReplacedHiveConnectorFactory : public velox::connector::hive::HiveConnectorFactory { - public: - MemoryPoolReplacedHiveConnectorFactory(velox::memory::MemoryPool* pool) : pool_{pool} {} - std::shared_ptr newConnector( - const std::string& id, - std::shared_ptr properties, - folly::Executor* executor) override { - return std::make_shared(pool_, id, properties, executor); - } - - private: - velox::memory::MemoryPool* pool_; -}; } // namespace -void VeloxBackend::initConnectorFactory(const std::unordered_map& conf) { +void VeloxBackend::initConnector(const std::unordered_map& conf) { int32_t ioThreads = std::stoi(getConfigValue(conf, kVeloxIOThreads, kVeloxIOThreadsDefault)); int32_t splitPreloadPerDriver = std::stoi(getConfigValue(conf, kVeloxSplitPreloadPerDriver, kVeloxSplitPreloadPerDriverDefault)); + + if (splitPreloadPerDriver > 0 && ioThreads > 0) { + LOG(INFO) << "STARTUP: Using split preloading, Split preload per driver: " << splitPreloadPerDriver + << ", IO threads: " << ioThreads; + } + + std::unordered_map configurationValues; + +#ifdef ENABLE_S3 + std::string awsAccessKey = conf.at("spark.hadoop.fs.s3a.access.key"); + std::string awsSecretKey = conf.at("spark.hadoop.fs.s3a.secret.key"); + std::string awsEndpoint = conf.at("spark.hadoop.fs.s3a.endpoint"); + std::string sslEnabled = conf.at("spark.hadoop.fs.s3a.connection.ssl.enabled"); + std::string pathStyleAccess = conf.at("spark.hadoop.fs.s3a.path.style.access"); + std::string useInstanceCredentials = conf.at("spark.hadoop.fs.s3a.use.instance.credentials"); + std::string iamRole = conf.at("spark.hadoop.fs.s3a.iam.role"); + std::string iamRoleSessionName = conf.at("spark.hadoop.fs.s3a.iam.role.session.name"); + + const char* envAwsAccessKey = std::getenv("AWS_ACCESS_KEY_ID"); + if (envAwsAccessKey != nullptr) { + awsAccessKey = std::string(envAwsAccessKey); + } + const char* envAwsSecretKey = std::getenv("AWS_SECRET_ACCESS_KEY"); + if (envAwsSecretKey != nullptr) { + awsSecretKey = std::string(envAwsSecretKey); + } + const char* envAwsEndpoint = std::getenv("AWS_ENDPOINT"); + if (envAwsEndpoint != nullptr) { + awsEndpoint = std::string(envAwsEndpoint); + } + + std::unordered_map s3Config({}); + if (useInstanceCredentials == "true") { + s3Config.insert({ + {"hive.s3.use-instance-credentials", useInstanceCredentials}, + }); + } else if (!iamRole.empty()) { + s3Config.insert({ + {"hive.s3.iam-role", iamRole}, + }); + if (!iamRoleSessionName.empty()) { + s3Config.insert({ + {"hive.s3.iam-role-session-name", iamRoleSessionName}, + }); + } + } else { + s3Config.insert({ + {"hive.s3.aws-access-key", awsAccessKey}, + {"hive.s3.aws-secret-key", awsSecretKey}, + }); + } + // Only need to set s3 endpoint when not use instance credentials. + if (useInstanceCredentials != "true") { + s3Config.insert({ + {"hive.s3.endpoint", awsEndpoint}, + }); + } + s3Config.insert({ + {"hive.s3.ssl.enabled", sslEnabled}, + {"hive.s3.path-style-access", pathStyleAccess}, + }); + + configurationValues.merge(s3Config); +#endif + + auto properties = std::make_shared(configurationValues); + if (ioThreads > 0) { ioExecutor_ = std::make_unique(ioThreads); FLAGS_split_preload_per_driver = splitPreloadPerDriver; - // Use global memory pool for hive connector if SplitPreloadPerDriver was enabled. Otherwise the allocated memory + // Use global memory pool for hive connector if SplitPreloadPerDriver was enabled. Otherwise, the allocated memory // blocks might cause unexpected behavior (e.g. crash) since the allocations were proceed in background IO threads. - velox::connector::registerConnectorFactory( - std::make_shared(gluten::defaultLeafVeloxMemoryPool().get())); + velox::connector::registerConnector(std::make_shared( + gluten::defaultLeafVeloxMemoryPool().get(), kHiveConnectorId, properties, ioExecutor_.get())); } else { - velox::connector::registerConnectorFactory(std::make_shared()); - } - - if (splitPreloadPerDriver > 0 && ioThreads > 0) { - LOG(INFO) << "STARTUP: Using split preloading, Split preload per driver: " << splitPreloadPerDriver - << ", IO threads: " << ioThreads; + velox::connector::registerConnector( + std::make_shared(kHiveConnectorId, properties, ioExecutor_.get())); } } diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index a975a51ae640..25027c8792db 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -57,7 +57,7 @@ class VeloxBackend { void init(const std::unordered_map& conf); void initCache(const std::unordered_map& conf); - void initConnectorFactory(const std::unordered_map& conf); + void initConnector(const std::unordered_map& conf); void initUdf(const std::unordered_map& conf); void initJolFilesystem(const std::unordered_map& conf);