Skip to content

Commit

Permalink
decouple ShuffleWriter and PartitionWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Dec 5, 2023
1 parent 5580393 commit e7430aa
Show file tree
Hide file tree
Showing 25 changed files with 667 additions and 681 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ class MetricsApiImpl extends MetricsApi with Logging {
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions"),
"bytesSpilled" -> SQLMetrics.createSizeMetric(sparkContext, "shuffle bytes spilled"),
"splitBufferSize" -> SQLMetrics.createSizeMetric(sparkContext, "split buffer size total"),
"splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to split"),
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
utils/StringUtil.cc
utils/ObjectStore.cc
jni/JniError.cc
jni/JniCommon.cc)
jni/JniCommon.cc shuffle/PartitionWriter.cc)

file(MAKE_DIRECTORY ${root_directory}/releases)
add_library(gluten SHARED ${SPARK_COLUMNAR_PLUGIN_SRCS})
Expand Down
236 changes: 149 additions & 87 deletions cpp/core/shuffle/LocalPartitionWriter.cc

Large diffs are not rendered by default.

40 changes: 30 additions & 10 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@ struct SpillInfo {

class LocalPartitionWriter : public ShuffleWriter::PartitionWriter {
public:
explicit LocalPartitionWriter(ShuffleWriter* shuffleWriter) : PartitionWriter(shuffleWriter) {}
explicit LocalPartitionWriter(uint32_t numPartitions, ShuffleWriterOptions* options)
: PartitionWriter(numPartitions, options) {}

arrow::Status init() override;

arrow::Status requestNextEvict(bool flush) override;

EvictHandle* getEvictHandle() override;
arrow::Status evict(
uint32_t partitionId,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
Evictor::Type evictType) override;

arrow::Status finishEvict() override;

Expand All @@ -75,35 +78,52 @@ class LocalPartitionWriter : public ShuffleWriter::PartitionWriter {
/// If spill is triggered by 2.c, cached payloads of the remaining unmerged partitions will be spilled.
/// In both cases, if the cached payload size doesn't free enough memory,
/// it will shrink partition buffers to free more memory.
arrow::Status stop() override;
arrow::Status stop(ShuffleWriterMetrics* metrics) override;

arrow::Status evictFixedSize(int64_t size, int64_t* actual) override;

class LocalEvictHandle;
class LocalEvictor;

private:
arrow::Status requestEvict(Evictor::Type evictType);

LocalEvictor* getEvictHandle();

arrow::Status setLocalDirs();

std::string nextSpilledFileDir();

arrow::Status openDataFile();

arrow::Status mergeSpills(uint32_t partitionId);

arrow::Status clearResource();

std::shared_ptr<arrow::fs::LocalFileSystem> fs_{};
std::shared_ptr<LocalEvictHandle> evictHandle_;
std::vector<std::shared_ptr<SpillInfo>> spills_;
arrow::Status populateMetrics(ShuffleWriterMetrics* metrics);

std::shared_ptr<arrow::fs::LocalFileSystem> fs_{nullptr};
std::shared_ptr<LocalEvictor> evictor_{nullptr};
std::vector<std::shared_ptr<SpillInfo>> spills_{};

// configured local dirs for spilled file
int32_t dirSelection_ = 0;
std::vector<int32_t> subDirSelection_;
std::vector<std::string> configuredDirs_;

std::shared_ptr<arrow::io::OutputStream> dataFileOs_;
int64_t totalBytesEvicted_{0};
int64_t totalBytesWritten_{0};
std::vector<int64_t> partitionLengths_;
std::vector<int64_t> rawPartitionLengths_;
std::vector<std::tuple<uint32_t, uint32_t, std::vector<std::shared_ptr<arrow::Buffer>>>> cachedPartitionBuffers_;
};

class LocalPartitionWriterCreator : public ShuffleWriter::PartitionWriterCreator {
public:
LocalPartitionWriterCreator();

arrow::Result<std::shared_ptr<ShuffleWriter::PartitionWriter>> make(ShuffleWriter* shuffleWriter) override;
arrow::Result<std::shared_ptr<ShuffleWriter::PartitionWriter>> make(
uint32_t numPartitions,
ShuffleWriterOptions* options) override;
};
} // namespace gluten
3 changes: 3 additions & 0 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ struct ShuffleWriterOptions {

arrow::ipc::IpcWriteOptions ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();

std::shared_ptr<arrow::Schema> write_schema{nullptr};
std::shared_ptr<arrow::util::Codec> codec{nullptr};

std::string data_file{};
std::string local_dirs{};
arrow::MemoryPool* memory_pool{};
Expand Down
51 changes: 51 additions & 0 deletions cpp/core/shuffle/PartitionWriter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "shuffle/PartitionWriter.h"
#include "shuffle/Utils.h"

namespace gluten {

arrow::Result<std::unique_ptr<arrow::ipc::IpcPayload>> gluten::ShuffleWriter::PartitionWriter::createPayloadFromBuffers(
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers) {
std::shared_ptr<arrow::RecordBatch> recordBatch;
if (options_->compression_type != arrow::Compression::UNCOMPRESSED) {
ARROW_ASSIGN_OR_RAISE(
recordBatch,
makeCompressedRecordBatch(
numRows,
std::move(buffers),
options_->write_schema,
options_->ipc_write_options.memory_pool,
options_->codec.get(),
options_->compression_threshold,
options_->compression_mode,
compressTime_));
} else {
ARROW_ASSIGN_OR_RAISE(
recordBatch,
makeUncompressedRecordBatch(
numRows, std::move(buffers), options_->write_schema, options_->ipc_write_options.memory_pool));
}

auto payload = std::make_unique<arrow::ipc::IpcPayload>();
RETURN_NOT_OK(arrow::ipc::GetRecordBatchPayload(*recordBatch, options_->ipc_write_options, payload.get()));
return payload;
}

} // namespace gluten
53 changes: 37 additions & 16 deletions cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,61 @@

namespace gluten {

class EvictHandle {
class Evictor {
public:
virtual ~EvictHandle() = default;
enum Type { kCache, kFlush, kStop };

Evictor(ShuffleWriterOptions* options) : options_(options) {}

virtual ~Evictor() = default;

virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr<arrow::ipc::IpcPayload> payload) = 0;

virtual arrow::Status finish() = 0;

int64_t getEvictTime() {
return evictTime_;
}

protected:
ShuffleWriterOptions* options_;

int64_t evictTime_{0};
};

class ShuffleWriter::PartitionWriter {
class ShuffleWriter::PartitionWriter : public Evictable {
public:
PartitionWriter(ShuffleWriter* shuffleWriter) : shuffleWriter_(shuffleWriter) {}
PartitionWriter(uint32_t numPartitions, ShuffleWriterOptions* options)
: numPartitions_(numPartitions), options_(options) {}

virtual ~PartitionWriter() = default;

virtual arrow::Status init() = 0;

virtual arrow::Status stop() = 0;
virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0;

/// Request next evict. The caller can use `requestNextEvict` to start a evict, and choose to call
/// `getEvictHandle()->evict()` immediately, or to call it latter somewhere else.
/// The caller can start new evict multiple times. Once it's called, the last `EvictHandle`
/// will be finished automatically.
/// Evict buffers for `partitionId` partition.
/// \param flush Whether to flush the evicted data immediately. If it's false,
/// the data can be cached first.
virtual arrow::Status requestNextEvict(bool flush) = 0;

/// Get the current managed EvictHandle. Returns nullptr if the current EvictHandle was finished,
/// or requestNextEvict has not been called.
/// \return
virtual EvictHandle* getEvictHandle() = 0;
virtual arrow::Status evict(
uint32_t partitionId,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
Evictor::Type evictType) = 0;

virtual arrow::Status finishEvict() = 0;

ShuffleWriter* shuffleWriter_;
protected:
arrow::Result<std::unique_ptr<arrow::ipc::IpcPayload>> createPayloadFromBuffers(
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> buffers);

uint32_t numPartitions_;

ShuffleWriterOptions* options_;
int64_t compressTime_{0};
int64_t evictTime_{0};
int64_t writeTime_{0};
};

} // namespace gluten
4 changes: 3 additions & 1 deletion cpp/core/shuffle/PartitionWriterCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class ShuffleWriter::PartitionWriterCreator {
PartitionWriterCreator() = default;
virtual ~PartitionWriterCreator() = default;

virtual arrow::Result<std::shared_ptr<ShuffleWriter::PartitionWriter>> make(ShuffleWriter* shuffleWriter) = 0;
virtual arrow::Result<std::shared_ptr<ShuffleWriter::PartitionWriter>> make(
uint32_t numPartitions,
ShuffleWriterOptions* options) = 0;
};

} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/core/shuffle/ShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ShuffleReaderOutStream : public ColumnarBatchIterator {
const std::function<void(int64_t)> ipcTimeAccumulator)
: options_(options), in_(in), ipcTimeAccumulator_(ipcTimeAccumulator) {
if (options.compression_type != arrow::Compression::UNCOMPRESSED) {
writeSchema_ = toCompressWriteSchema(*schema);
writeSchema_ = toCompressWriteSchema();
} else {
writeSchema_ = toWriteSchema(*schema);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/ShuffleSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ inline std::shared_ptr<arrow::Schema> toWriteSchema(arrow::Schema& schema) {
return std::make_shared<arrow::Schema>(fields);
}

inline std::shared_ptr<arrow::Schema> toCompressWriteSchema(arrow::Schema& schema) {
inline std::shared_ptr<arrow::Schema> toCompressWriteSchema() {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.emplace_back(std::make_shared<arrow::Field>("header", arrow::large_utf8()));
fields.emplace_back(std::make_shared<arrow::Field>("lengthBuffer", arrow::large_utf8()));
Expand Down
19 changes: 0 additions & 19 deletions cpp/core/shuffle/ShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,4 @@ namespace gluten {
// by default, allocate 8M block, 2M page size
#define SPLIT_BUFFER_SIZE 16 * 1024 * 1024
#endif

std::shared_ptr<arrow::Schema> ShuffleWriter::writeSchema() {
if (writeSchema_ != nullptr) {
return writeSchema_;
}

writeSchema_ = toWriteSchema(*schema_);
return writeSchema_;
}

std::shared_ptr<arrow::Schema> ShuffleWriter::compressWriteSchema() {
if (compressWriteSchema_ != nullptr) {
return compressWriteSchema_;
}

compressWriteSchema_ = toCompressWriteSchema(*schema_);
return compressWriteSchema_;
}

} // namespace gluten
Loading

0 comments on commit e7430aa

Please sign in to comment.