diff --git a/cpp/core/shuffle/Partitioner.cc b/cpp/core/shuffle/Partitioner.cc index c777f6ae2cfb..1b98f7a6702a 100644 --- a/cpp/core/shuffle/Partitioner.cc +++ b/cpp/core/shuffle/Partitioner.cc @@ -23,10 +23,7 @@ namespace gluten { -arrow::Result> Partitioner::make( - Partitioning partitioning, - int32_t numPartitions, - int32_t partitionKeySeed) { +arrow::Result> Partitioner::make(Partitioning partitioning, int32_t numPartitions, int32_t partitionKeySeed) { switch (partitioning) { case Partitioning::kHash: return std::make_shared(numPartitions); diff --git a/cpp/core/shuffle/Partitioner.h b/cpp/core/shuffle/Partitioner.h index f4517808de9d..e9576b36388f 100644 --- a/cpp/core/shuffle/Partitioner.h +++ b/cpp/core/shuffle/Partitioner.h @@ -26,10 +26,7 @@ namespace gluten { class Partitioner { public: - static arrow::Result> make( - Partitioning partitioning, - int32_t numPartitions, - int32_t partitionKeySeed); + static arrow::Result> make(Partitioning partitioning, int32_t numPartitions, int32_t partitionKeySeed); // Whether the first column is partition key. bool hasPid() const { diff --git a/cpp/core/shuffle/RoundRobinPartitioner.h b/cpp/core/shuffle/RoundRobinPartitioner.h index 50facf0cdb34..9577342784f4 100644 --- a/cpp/core/shuffle/RoundRobinPartitioner.h +++ b/cpp/core/shuffle/RoundRobinPartitioner.h @@ -23,8 +23,8 @@ namespace gluten { class RoundRobinPartitioner final : public Partitioner { public: - RoundRobinPartitioner(int32_t numPartitions, int32_t partitionKeySeed) - : Partitioner(numPartitions, false), pidSelection_(partitionKeySeed) {} + RoundRobinPartitioner(int32_t numPartitions, int32_t partitionKeySeed) + : Partitioner(numPartitions, false), pidSelection_(partitionKeySeed % numPartitions) {} arrow::Status compute( const int32_t* pidArr, diff --git a/cpp/core/tests/RoundRobinPartitionerTest.cc b/cpp/core/tests/RoundRobinPartitionerTest.cc index e29ebcd55d6a..79bfa8184590 100644 --- a/cpp/core/tests/RoundRobinPartitionerTest.cc +++ b/cpp/core/tests/RoundRobinPartitionerTest.cc @@ -63,7 +63,7 @@ class RoundRobinPartitionerTest : public ::testing::Test { TEST_F(RoundRobinPartitionerTest, TestInit) { int numPart = 2; - prepareData(numPart, 1); + prepareData(numPart, 3); ASSERT_NE(partitioner_, nullptr); int32_t pidSelection = getPidSelection(); ASSERT_EQ(pidSelection, 1); diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc index 8a5dd38309d0..82574513f2e7 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -411,9 +411,7 @@ arrow::Status VeloxShuffleWriter::init() { VELOX_CHECK_NOT_NULL(options_.memory_pool); ARROW_ASSIGN_OR_RAISE(partitionWriter_, partitionWriterCreator_->make(this)); - ARROW_ASSIGN_OR_RAISE( - partitioner_, - Partitioner::make(options_.partitioning, numPartitions_, options_.partition_key_seed)); + ARROW_ASSIGN_OR_RAISE(partitioner_, Partitioner::make(options_.partitioning, numPartitions_, options_.partition_key_seed)); // pre-allocated buffer size for each partition, unit is row count // when partitioner is SinglePart, partial variables don`t need init diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala index d75f307d5e39..71998ebb5345 100644 --- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala @@ -111,6 +111,7 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V]( .getNativeInstanceHandle, handle, context.taskAttemptId(), + partitionKeySeed, "celeborn", GlutenConfig.getConf.columnarShuffleReallocThreshold )