Skip to content

Commit

Permalink
[VL] fix RoundRobinPartitioner by setting seed pre partition
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 committed Nov 24, 2023
1 parent 41b8850 commit 3032d11
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 30 deletions.
2 changes: 2 additions & 0 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
jdouble reallocThreshold,
jlong firstBatchHandle,
jlong taskAttemptId,
jint partitionKeySeed,
jint pushBufferMaxSize,
jobject partitionPusher,
jstring partitionWriterTypeJstr) {
Expand Down Expand Up @@ -825,6 +826,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper
}

shuffleWriterOptions.task_attempt_id = (int64_t)taskAttemptId;
shuffleWriterOptions.partition_key_seed = partitionKeySeed;
shuffleWriterOptions.compression_threshold = bufferCompressThreshold;

auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr, JNI_FALSE);
Expand Down
1 change: 1 addition & 0 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct ShuffleWriterOptions {

int64_t thread_id = -1;
int64_t task_attempt_id = -1;
int32_t partition_key_seed = 0;

arrow::ipc::IpcWriteOptions ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();

Expand Down
7 changes: 5 additions & 2 deletions cpp/core/shuffle/Partitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

namespace gluten {

arrow::Result<std::shared_ptr<Partitioner>> Partitioner::make(Partitioning partitioning, int32_t numPartitions) {
arrow::Result<std::shared_ptr<Partitioner>> Partitioner::make(
Partitioning partitioning,
int32_t numPartitions,
int32_t partitionKeySeed) {
switch (partitioning) {
case Partitioning::kHash:
return std::make_shared<HashPartitioner>(numPartitions);
case Partitioning::kRoundRobin:
return std::make_shared<RoundRobinPartitioner>(numPartitions);
return std::make_shared<RoundRobinPartitioner>(numPartitions, partitionKeySeed);
case Partitioning::kSingle:
return std::make_shared<SinglePartitioner>();
case Partitioning::kRange:
Expand Down
5 changes: 4 additions & 1 deletion cpp/core/shuffle/Partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ namespace gluten {

class Partitioner {
public:
static arrow::Result<std::shared_ptr<Partitioner>> make(Partitioning partitioning, int32_t numPartitions);
static arrow::Result<std::shared_ptr<Partitioner>> make(
Partitioning partitioning,
int32_t numPartitions,
int32_t partitionKeySeed);

// Whether the first column is partition key.
bool hasPid() const {
Expand Down
18 changes: 3 additions & 15 deletions cpp/core/shuffle/RoundRobinPartitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,9 @@ arrow::Status gluten::RoundRobinPartitioner::compute(
std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
row2Partition.resize(numRows);

int32_t pidSelection = pidSelection_;
for (int32_t i = 0; i < numRows;) {
int32_t low = i;
int32_t up = std::min((int64_t)(i + (numPartitions_ - pidSelection)), numRows);
for (; low != up;) {
row2Partition[low++] = pidSelection++;
}

pidSelection_ = pidSelection;
pidSelection = 0;
i = up;
}

if (pidSelection_ >= numPartitions_) {
pidSelection_ -= numPartitions_;
for (int32_t i = 0; i < numRows; ++i) {
pidSelection_ = (pidSelection_ + 1) % numPartitions_;
row2Partition[i] = pidSelection_;
}

for (auto& pid : row2Partition) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/shuffle/RoundRobinPartitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ namespace gluten {

class RoundRobinPartitioner final : public Partitioner {
public:
RoundRobinPartitioner(int32_t numPartitions) : Partitioner(numPartitions, false) {}
RoundRobinPartitioner(int32_t numPartitions, int32_t partitionKeySeed)
: Partitioner(numPartitions, false), pidSelection_(partitionKeySeed) {}

arrow::Status compute(
const int32_t* pidArr,
Expand Down
16 changes: 8 additions & 8 deletions cpp/core/tests/RoundRobinPartitionerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ class RoundRobinPartitionerTest : public ::testing::Test {
};

TEST_F(RoundRobinPartitionerTest, TestInit) {
int numPart = 0;
prepareData(numPart);
int numPart = 2;
prepareData(numPart, 1);
ASSERT_NE(partitioner_, nullptr);
int32_t pidSelection = getPidSelection();
ASSERT_EQ(pidSelection, 0);
ASSERT_EQ(pidSelection, 1);
}

TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {
// numRows equal numPart
{
int numPart = 10;
prepareData(numPart);
prepareData(numPart, 0);
int numRows = 10;
ASSERT_TRUE(partitioner_->compute(nullptr, numRows, row2Partition_, partition2RowCount_).ok());
ASSERT_EQ(getPidSelection(), 0);
Expand All @@ -85,7 +85,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {
// numRows less than numPart
{
int numPart = 10;
prepareData(numPart);
prepareData(numPart, 0);
int numRows = 8;
ASSERT_TRUE(partitioner_->compute(nullptr, numRows, row2Partition_, partition2RowCount_).ok());
ASSERT_EQ(getPidSelection(), 8);
Expand All @@ -99,7 +99,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {
// numRows greater than numPart
{
int numPart = 10;
prepareData(numPart);
prepareData(numPart, 0);
int numRows = 12;
ASSERT_TRUE(partitioner_->compute(nullptr, numRows, row2Partition_, partition2RowCount_).ok());
ASSERT_EQ(getPidSelection(), 2);
Expand All @@ -113,7 +113,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {
// numRows greater than 2*numPart
{
int numPart = 10;
prepareData(numPart);
prepareData(numPart, 0);
int numRows = 22;
ASSERT_TRUE(partitioner_->compute(nullptr, numRows, row2Partition_, partition2RowCount_).ok());
ASSERT_EQ(getPidSelection(), 2);
Expand All @@ -127,7 +127,7 @@ TEST_F(RoundRobinPartitionerTest, TestComoputeNormal) {

TEST_F(RoundRobinPartitionerTest, TestComoputeContinuous) {
int numPart = 10;
prepareData(numPart);
prepareData(numPart, 0);

{
int numRows = 8;
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ arrow::Status VeloxShuffleWriter::init() {
VELOX_CHECK_NOT_NULL(options_.memory_pool);

ARROW_ASSIGN_OR_RAISE(partitionWriter_, partitionWriterCreator_->make(this));
ARROW_ASSIGN_OR_RAISE(partitioner_, Partitioner::make(options_.partitioning, numPartitions_));
ARROW_ASSIGN_OR_RAISE(
partitioner_,
Partitioner::make(options_.partitioning, numPartitions_, options_.partition_key_seed));

// pre-allocated buffer size for each partition, unit is row count
// when partitioner is SinglePart, partial variables don`t need init
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SparkResourceUtil
import org.apache.spark.util.random.XORShiftRandom

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
Expand Down Expand Up @@ -72,6 +73,12 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
} else {
val handle = ColumnarBatches.getNativeHandle(cb)
if (nativeShuffleWriter == -1L) {
val partitionKeySeed = dep.nativePartitioning.getShortName match {
case "rr" =>
new XORShiftRandom(context.partitionId())
.nextInt(dep.partitioner.numPartitions)
case _ => 0
}
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
nativeBufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public long make(
boolean writeEOS,
double reallocThreshold,
long handle,
long taskAttemptId) {
long taskAttemptId,
int partitionKeySeed) {
return nativeMake(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -81,6 +82,7 @@ public long make(
reallocThreshold,
handle,
taskAttemptId,
partitionKeySeed,
0,
null,
"local");
Expand All @@ -105,6 +107,7 @@ public long makeForRSS(
long memoryManagerHandle,
long handle,
long taskAttemptId,
int partitionKeySeed,
String partitionWriterType,
double reallocThreshold) {
return nativeMake(
Expand All @@ -123,6 +126,7 @@ public long makeForRSS(
reallocThreshold,
handle,
taskAttemptId,
partitionKeySeed,
pushBufferMaxSize,
pusher,
partitionWriterType);
Expand All @@ -144,6 +148,7 @@ public native long nativeMake(
double reallocThreshold,
long handle,
long taskAttemptId,
int partitionKeySeed,
int pushBufferMaxSize,
Object pusher,
String partitionWriterType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil, Utils}
import org.apache.spark.util.random.XORShiftRandom

import java.io.IOException

Expand Down Expand Up @@ -121,6 +122,12 @@ class ColumnarShuffleWriter[K, V](
val rows = cb.numRows()
val handle = ColumnarBatches.getNativeHandle(cb)
if (nativeShuffleWriter == -1L) {
val partitionKeySeed = dep.nativePartitioning.getShortName match {
case "rr" =>
new XORShiftRandom(taskContext.partitionId())
.nextInt(dep.partitioner.numPartitions)
case _ => 0
}
nativeShuffleWriter = jniWrapper.make(
dep.nativePartitioning,
nativeBufferSize,
Expand Down Expand Up @@ -155,7 +162,8 @@ class ColumnarShuffleWriter[K, V](
writeEOS,
reallocThreshold,
handle,
taskContext.taskAttemptId()
taskContext.taskAttemptId(),
partitionKeySeed
)
}
val startTime = System.nanoTime()
Expand Down

0 comments on commit 3032d11

Please sign in to comment.