Skip to content

Commit

Permalink
[VL] Replace string comparisons with enum for shuffle partitioning (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma authored Nov 4, 2023
1 parent 13a0a63 commit 16e2874
Show file tree
Hide file tree
Showing 29 changed files with 153 additions and 92 deletions.
5 changes: 3 additions & 2 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,15 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
memory/ArrowMemoryPool.cc
memory/ColumnarBatch.cc
operators/writer/ArrowWriter.cc
shuffle/options.cc
shuffle/Options.cc
shuffle/ShuffleReader.cc
shuffle/ShuffleWriter.cc
shuffle/Partitioner.cc
shuffle/FallbackRangePartitioner.cc
shuffle/HashPartitioner.cc
shuffle/RoundRobinPartitioner.cc
shuffle/SinglePartPartitioner.cc
shuffle/SinglePartitioner.cc
shuffle/Partitioning.cc
shuffle/PartitionWriterCreator.cc
shuffle/LocalPartitionWriter.cc
shuffle/rss/RemotePartitionWriter.cc
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/benchmarks/CompressionBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include <utility>

#include "shuffle/ShuffleWriter.h"
#include "utils/compression.h"
#include "utils/Compression.h"
#include "utils/macros.h"

void printTrace(void) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#include "config/GlutenConfig.h"
#include "memory/AllocationListener.h"
#include "shuffle/rss/RssClient.h"
#include "utils/Compression.h"
#include "utils/DebugOut.h"
#include "utils/compression.h"
#include "utils/exception.h"

static jint jniVersion = JNI_VERSION_1_8;
Expand Down
6 changes: 4 additions & 2 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/PartitionWriterCreator.h"
#include "shuffle/Partitioning.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "shuffle/Utils.h"
Expand Down Expand Up @@ -775,10 +776,11 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
return kInvalidResourceHandle;
}

auto shuffleWriterOptions = ShuffleWriterOptions::defaults();

auto partitioningName = jStringToCString(env, partitioningNameJstr);
shuffleWriterOptions.partitioning = gluten::toPartitioning(partitioningName);

auto shuffleWriterOptions = ShuffleWriterOptions::defaults();
shuffleWriterOptions.partitioning_name = partitioningName;
if (bufferSize > 0) {
shuffleWriterOptions.buffer_size = bufferSize;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/shuffle/FallbackRangePartitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace gluten {

class FallbackRangePartitioner final : public ShuffleWriter::Partitioner {
class FallbackRangePartitioner final : public Partitioner {
public:
FallbackRangePartitioner(int32_t numPartitions, bool hasPid) : Partitioner(numPartitions, hasPid) {}
FallbackRangePartitioner(int32_t numPartitions) : Partitioner(numPartitions, true) {}

arrow::Status compute(
const int32_t* pidArr,
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/shuffle/HashPartitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace gluten {

class HashPartitioner final : public ShuffleWriter::Partitioner {
class HashPartitioner final : public Partitioner {
public:
HashPartitioner(int32_t numPartitions, bool hasPid) : Partitioner(numPartitions, hasPid) {}
HashPartitioner(int32_t numPartitions) : Partitioner(numPartitions, true) {}

arrow::Status compute(
const int32_t* pidArr,
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/shuffle/options.cc → cpp/core/shuffle/Options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
* limitations under the License.
*/

#include "options.h"
#include "shuffle/Options.h"

gluten::ShuffleReaderOptions gluten::ShuffleReaderOptions::defaults() {
return {};
}
Expand Down
7 changes: 4 additions & 3 deletions cpp/core/shuffle/options.h → cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
#pragma once

#include <arrow/ipc/options.h>
#include "utils/compression.h"
#include "shuffle/Partitioning.h"
#include "utils/Compression.h"

namespace gluten {

Expand Down Expand Up @@ -52,14 +53,14 @@ struct ShuffleWriterOptions {
bool buffered_write = kEnableBufferedWrite;
bool write_eos = kWriteEos;

PartitionWriterType partition_writer_type = kLocal;
PartitionWriterType partition_writer_type = PartitionWriterType::kLocal;
Partitioning partitioning = Partitioning::kRoundRobin;

int64_t thread_id = -1;
int64_t task_attempt_id = -1;

arrow::ipc::IpcWriteOptions ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();

std::string partitioning_name{};
std::string data_file{};
std::string local_dirs{};
arrow::MemoryPool* memory_pool{};
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

#pragma once

#include "shuffle/Options.h"
#include "shuffle/ShuffleWriter.h"
#include "shuffle/options.h"

namespace gluten {

Expand Down
32 changes: 13 additions & 19 deletions cpp/core/shuffle/Partitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,22 @@
#include "shuffle/FallbackRangePartitioner.h"
#include "shuffle/HashPartitioner.h"
#include "shuffle/RoundRobinPartitioner.h"
#include "shuffle/SinglePartPartitioner.h"
#include "shuffle/SinglePartitioner.h"

namespace gluten {

arrow::Result<std::shared_ptr<ShuffleWriter::Partitioner>> ShuffleWriter::Partitioner::make(
const std::string& name,
int32_t numPartitions) {
std::shared_ptr<ShuffleWriter::Partitioner> partitioner = nullptr;
if (name == "hash") {
partitioner = ShuffleWriter::Partitioner::create<HashPartitioner>(numPartitions, true);
} else if (name == "rr") {
partitioner = ShuffleWriter::Partitioner::create<RoundRobinPartitioner>(numPartitions, false);
} else if (name == "range") {
partitioner = ShuffleWriter::Partitioner::create<FallbackRangePartitioner>(numPartitions, true);
} else if (name == "single") {
partitioner = ShuffleWriter::Partitioner::create<SinglePartPartitioner>(numPartitions, false);
}

if (!partitioner) {
return arrow::Status::NotImplemented("Partitioning " + name + " not supported yet.");
} else {
return partitioner;
arrow::Result<std::shared_ptr<Partitioner>> Partitioner::make(Partitioning partitioning, int32_t numPartitions) {
switch (partitioning) {
case Partitioning::kHash:
return std::make_shared<HashPartitioner>(numPartitions);
case Partitioning::kRoundRobin:
return std::make_shared<RoundRobinPartitioner>(numPartitions);
case Partitioning::kSingle:
return std::make_shared<SinglePartitioner>();
case Partitioning::kRange:
return std::make_shared<FallbackRangePartitioner>(numPartitions);
default:
return arrow::Status::Invalid("Unsupported partitioning type: " + std::to_string(partitioning));
}
}

Expand Down
22 changes: 9 additions & 13 deletions cpp/core/shuffle/Partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@

#pragma once

#include "shuffle/ShuffleWriter.h"
#include <arrow/result.h>
#include <memory>
#include <vector>
#include "shuffle/Partitioning.h"

namespace gluten {

class ShuffleWriter::Partitioner {
class Partitioner {
public:
template <typename Partitioner>
static std::shared_ptr<Partitioner> create(int32_t numPartitions, bool hasPid) {
return std::make_shared<Partitioner>(numPartitions, hasPid);
}

static arrow::Result<std::shared_ptr<ShuffleWriter::Partitioner>> make(
const std::string& name,
int32_t numPartitions);
static arrow::Result<std::shared_ptr<Partitioner>> make(Partitioning partitioning, int32_t numPartitions);

// whether the first column is partition key
// Whether the first column is partition key.
bool hasPid() const {
return hasPid_;
}
Expand All @@ -46,11 +42,11 @@ class ShuffleWriter::Partitioner {
protected:
Partitioner(int32_t numPartitions, bool hasPid) : numPartitions_(numPartitions), hasPid_(hasPid) {}

Partitioner() : numPartitions_(1), hasPid_(false) {}

virtual ~Partitioner() = default;

int32_t numPartitions_;

// if the first column is partition key
bool hasPid_;
};

Expand Down
45 changes: 45 additions & 0 deletions cpp/core/shuffle/Partitioning.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "shuffle/Partitioning.h"
#include "utils/exception.h"

namespace {
static const std::string kSinglePartitioningName = "single";
static const std::string kRoundRobinPartitioningName = "rr";
static const std::string kHashPartitioningName = "hash";
static const std::string kRangePartitioningName = "range";
} // namespace

namespace gluten {
Partitioning toPartitioning(std::string name) {
if (name == kSinglePartitioningName) {
return Partitioning::kSingle;
}
if (name == kRoundRobinPartitioningName) {
return Partitioning::kRoundRobin;
}
if (name == kHashPartitioningName) {
return Partitioning::kHash;
}
if (name == kRangePartitioningName) {
return Partitioning::kRange;
}
throw GlutenException("Invalid partition name: " + name);
}

} // namespace gluten
27 changes: 27 additions & 0 deletions cpp/core/shuffle/Partitioning.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>

namespace gluten {
enum Partitioning { kSingle, kRoundRobin, kHash, kRange };

Partitioning toPartitioning(std::string name);

} // namespace gluten
4 changes: 2 additions & 2 deletions cpp/core/shuffle/RoundRobinPartitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

namespace gluten {

class RoundRobinPartitioner final : public ShuffleWriter::Partitioner {
class RoundRobinPartitioner final : public Partitioner {
public:
RoundRobinPartitioner(int32_t numPartitions, bool hasPid) : Partitioner(numPartitions, hasPid) {}
RoundRobinPartitioner(int32_t numPartitions) : Partitioner(numPartitions, false) {}

arrow::Status compute(
const int32_t* pidArr,
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include <arrow/ipc/message.h>
#include <arrow/ipc/options.h>

#include "Options.h"
#include "compute/ResultIterator.h"
#include "options.h"
#include "utils/compression.h"
#include "utils/Compression.h"

namespace gluten {

Expand Down
8 changes: 4 additions & 4 deletions cpp/core/shuffle/ShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
#include "memory/ArrowMemoryPool.h"
#include "memory/ColumnarBatch.h"
#include "memory/Evictable.h"
#include "shuffle/options.h"
#include "utils/compression.h"
#include "shuffle/Options.h"
#include "shuffle/Partitioner.h"
#include "shuffle/Partitioning.h"
#include "utils/Compression.h"

namespace gluten {

Expand Down Expand Up @@ -169,8 +171,6 @@ class ShuffleWriter : public Evictable {

class PartitionWriter;

class Partitioner;

class PartitionWriterCreator;

protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

#include "shuffle/SinglePartPartitioner.h"
#include "shuffle/SinglePartitioner.h"

namespace gluten {

arrow::Status gluten::SinglePartPartitioner::compute(
arrow::Status gluten::SinglePartitioner::compute(
const int32_t* pidArr,
const int64_t numRows,
std::vector<uint16_t>& row2partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
#include "shuffle/Partitioner.h"

namespace gluten {
class SinglePartPartitioner final : public ShuffleWriter::Partitioner {
class SinglePartitioner final : public Partitioner {
public:
SinglePartPartitioner(int32_t numPartitions, bool hasPid) : Partitioner(numPartitions, hasPid) {}
SinglePartitioner() : Partitioner(1, false) {}

arrow::Status compute(
const int32_t* pidArr,
const int64_t numRows,
std::vector<uint16_t>& row2partition,
std::vector<uint32_t>& partition2RowCount) override;
};

} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

#include "shuffle/Utils.h"
#include "options.h"
#include "Options.h"
#include "utils/StringUtil.h"

#include <boost/uuid/uuid_generators.hpp>
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/tests/RoundRobinPartitionerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
#include "shuffle/RoundRobinPartitioner.h"
#include <gtest/gtest.h>
#include <cstdlib>
#include <numeric>

namespace gluten {
class RoundRobinPartitionerTest : public ::testing::Test {
protected:
void prepareData(int numPart) {
partitioner_ = ShuffleWriter::Partitioner::create<RoundRobinPartitioner>(numPart, false);
partitioner_ = std::make_shared<RoundRobinPartitioner>(numPart);
row2Partition_.clear();
partition2RowCount_.clear();
partition2RowCount_.resize(numPart);
Expand Down
Loading

0 comments on commit 16e2874

Please sign in to comment.