diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index f87eaabb56ae..3f546873d469 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -180,7 +180,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr rowSize_.resize(inputRows, *fixedRowSize_); } - uint32_t rowOffset = 0; + facebook::velox::vector_size_t rowOffset = 0; while (rowOffset < inputRows) { auto remainingRows = inputRows - rowOffset; auto rows = maxRowsToInsert(rowOffset, remainingRows); @@ -201,18 +201,23 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr return arrow::Status::OK(); } -void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) { +void VeloxSortShuffleWriter::insertRows( + facebook::velox::row::CompactRow& compact, + facebook::velox::vector_size_t offset, + facebook::velox::vector_size_t size) { VELOX_CHECK(!pages_.empty()); - for (auto i = offset; i < offset + rows; ++i) { - auto pid = row2Partition_[i]; + std::vector offsets(size); + for (auto i = 0; i < size; ++i) { + auto row = offset + i; + auto pid = row2Partition_[row]; arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_); // size(RowSize) | bytes - memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType)); - pageCursor_ += sizeof(RowSizeType); - auto size = row.serialize(i, currentPage_ + pageCursor_); - pageCursor_ += size; + memcpy(currentPage_ + pageCursor_, &rowSize_[row], sizeof(RowSizeType)); + offsets[i] = pageCursor_ + sizeof(RowSizeType); + pageCursor_ += rowSize_[row]; VELOX_DCHECK_LE(pageCursor_, currenPageSize_); } + compact.serialize(offset, size, offsets, currentPage_); } arrow::Status VeloxSortShuffleWriter::maybeSpill(uint32_t nextRows) { @@ -337,19 +342,21 @@ VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, int32_t numRows, u return arrow::Status::OK(); } -uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t remainingRows) { +facebook::velox::vector_size_t VeloxSortShuffleWriter::maxRowsToInsert( + facebook::velox::vector_size_t offset, + facebook::velox::vector_size_t remainingRows) { // Check how many rows can be handled. if (pages_.empty()) { return 0; } auto remainingBytes = pages_.back()->size() - pageCursor_; if (fixedRowSize_) { - return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), remainingRows); + return std::min((facebook::velox::vector_size_t)(remainingBytes / (fixedRowSize_.value())), remainingRows); } auto beginIter = rowSizePrefixSum_.begin() + 1 + offset; auto bytesWritten = rowSizePrefixSum_[offset]; auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes + bytesWritten); - return iter - beginIter; + return (facebook::velox::vector_size_t)(iter - beginIter); } void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired) { diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h index 531ed1fe3e76..5b8cff452d56 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h @@ -69,7 +69,10 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { arrow::Status insert(const facebook::velox::RowVectorPtr& vector, int64_t memLimit); - void insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows); + void insertRows( + facebook::velox::row::CompactRow& compact, + facebook::velox::vector_size_t offset, + facebook::velox::vector_size_t size); arrow::Status maybeSpill(uint32_t nextRows); @@ -79,7 +82,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter { arrow::Status evictPartition0(uint32_t partitionId, int32_t numRows, uint8_t* buffer, int64_t rawLength); - uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows); + facebook::velox::vector_size_t maxRowsToInsert( + facebook::velox::vector_size_t offset, + facebook::velox::vector_size_t remainingRows); void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired);