Skip to content

Commit

Permalink
[VL] Support s3 write (#3758)
Browse files Browse the repository at this point in the history
Support s3 write for Velox backend
  • Loading branch information
yma11 authored Nov 22, 2023
1 parent 5966579 commit 5ab5646
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 3 deletions.
5 changes: 4 additions & 1 deletion cpp/velox/benchmarks/ParquetWriteBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public GoogleBenchmar
for (auto _ : state) {
// Init VeloxParquetDataSource
auto veloxParquetDatasource = std::make_unique<gluten::VeloxParquetDatasource>(
outputPath_ + "/" + fileName, veloxPool->addAggregateChild("writer_benchmark"), localSchema);
outputPath_ + "/" + fileName,
veloxPool->addAggregateChild("writer_benchmark"),
veloxPool->addLeafChild("s3_sink_pool"),
localSchema);

veloxParquetDatasource->init(runtime->getConfMap());
auto start = std::chrono::steady_clock::now();
Expand Down
6 changes: 5 additions & 1 deletion cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ std::shared_ptr<Datasource> VeloxRuntime::createDatasource(
MemoryManager* memoryManager,
std::shared_ptr<arrow::Schema> schema) {
auto veloxPool = getAggregateVeloxPool(memoryManager);
return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool, schema);
// Pass a dedicate pool for S3 sink as can't share veloxPool
// with parquet writer.
auto s3SinkPool = getLeafVeloxPool(memoryManager);

return std::make_shared<VeloxParquetDatasource>(filePath, veloxPool, s3SinkPool, schema);
}

std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
Expand Down
10 changes: 10 additions & 0 deletions cpp/velox/operators/writer/VeloxParquetDatasource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ void VeloxParquetDatasource::init(const std::unordered_map<std::string, std::str
auto path = filePath_.substr(5);
auto localWriteFile = std::make_unique<LocalWriteFile>(path, true, false);
sink_ = std::make_unique<WriteFileSink>(std::move(localWriteFile), path);
} else if (strncmp(filePath_.c_str(), "s3a:", 4) == 0) {
#ifdef ENABLE_S3
auto fileSystem = getFileSystem(filePath_, nullptr);
auto* s3FileSystem = dynamic_cast<filesystems::S3FileSystem*>(fileSystem.get());
sink_ = std::make_unique<dwio::common::WriteFileSink>(
s3FileSystem->openFileForWrite(filePath_, {{}, s3SinkPool_.get()}), filePath_);
#else
throw std::runtime_error(
"The write path is S3 path but the S3 haven't been enabled when writing parquet data in velox runtime!");
#endif
} else if (strncmp(filePath_.c_str(), "hdfs:", 5) == 0) {
#ifdef ENABLE_HDFS
std::string pathSuffix = getHdfsPath(filePath_, HdfsFileSystem::kScheme);
Expand Down
12 changes: 11 additions & 1 deletion cpp/velox/operators/writer/VeloxParquetDatasource.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
#include "operators/writer/Datasource.h"

#include "velox/common/file/FileSystems.h"
#ifdef ENABLE_S3
#include "velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Util.h"
#endif
#ifdef ENABLE_HDFS
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsUtil.h"
Expand All @@ -49,8 +53,13 @@ class VeloxParquetDatasource final : public Datasource {
VeloxParquetDatasource(
const std::string& filePath,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> s3SinkPool,
std::shared_ptr<arrow::Schema> schema)
: Datasource(filePath, schema), filePath_(filePath), schema_(schema), pool_(std::move(veloxPool)) {}
: Datasource(filePath, schema),
filePath_(filePath),
schema_(schema),
pool_(std::move(veloxPool)),
s3SinkPool_(std::move(s3SinkPool)) {}

void init(const std::unordered_map<std::string, std::string>& sparkConfs) override;
void inspectSchema(struct ArrowSchema* out) override;
Expand All @@ -69,6 +78,7 @@ class VeloxParquetDatasource final : public Datasource {
std::shared_ptr<const facebook::velox::Type> type_;
std::shared_ptr<facebook::velox::parquet::Writer> parquetWriter_;
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_;
std::shared_ptr<facebook::velox::memory::MemoryPool> s3SinkPool_;
std::unique_ptr<facebook::velox::dwio::common::FileSink> sink_;
};

Expand Down

0 comments on commit 5ab5646

Please sign in to comment.