Skip to content

Commit

Permalink
use vectorized c2r
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 3, 2024
1 parent d1924aa commit 9128319
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
29 changes: 18 additions & 11 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr
rowSize_.resize(inputRows, *fixedRowSize_);
}

uint32_t rowOffset = 0;
facebook::velox::vector_size_t rowOffset = 0;
while (rowOffset < inputRows) {
auto remainingRows = inputRows - rowOffset;
auto rows = maxRowsToInsert(rowOffset, remainingRows);
Expand All @@ -201,18 +201,23 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr
return arrow::Status::OK();
}

void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows) {
void VeloxSortShuffleWriter::insertRows(
facebook::velox::row::CompactRow& compact,
facebook::velox::vector_size_t offset,
facebook::velox::vector_size_t size) {
VELOX_CHECK(!pages_.empty());
for (auto i = offset; i < offset + rows; ++i) {
auto pid = row2Partition_[i];
std::vector<size_t> offsets(size);
for (auto i = 0; i < size; ++i) {
auto row = offset + i;
auto pid = row2Partition_[row];
arrayPtr_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_);
// size(RowSize) | bytes
memcpy(currentPage_ + pageCursor_, &rowSize_[i], sizeof(RowSizeType));
pageCursor_ += sizeof(RowSizeType);
auto size = row.serialize(i, currentPage_ + pageCursor_);
pageCursor_ += size;
memcpy(currentPage_ + pageCursor_, &rowSize_[row], sizeof(RowSizeType));
offsets[i] = pageCursor_ + sizeof(RowSizeType);
pageCursor_ += rowSize_[row];
VELOX_DCHECK_LE(pageCursor_, currenPageSize_);
}
compact.serialize(offset, size, offsets, currentPage_);
}

arrow::Status VeloxSortShuffleWriter::maybeSpill(uint32_t nextRows) {
Expand Down Expand Up @@ -337,19 +342,21 @@ VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, int32_t numRows, u
return arrow::Status::OK();
}

uint32_t VeloxSortShuffleWriter::maxRowsToInsert(uint32_t offset, uint32_t remainingRows) {
facebook::velox::vector_size_t VeloxSortShuffleWriter::maxRowsToInsert(
facebook::velox::vector_size_t offset,
facebook::velox::vector_size_t remainingRows) {
// Check how many rows can be handled.
if (pages_.empty()) {
return 0;
}
auto remainingBytes = pages_.back()->size() - pageCursor_;
if (fixedRowSize_) {
return std::min((uint32_t)(remainingBytes / (fixedRowSize_.value())), remainingRows);
return std::min((facebook::velox::vector_size_t)(remainingBytes / (fixedRowSize_.value())), remainingRows);
}
auto beginIter = rowSizePrefixSum_.begin() + 1 + offset;
auto bytesWritten = rowSizePrefixSum_[offset];
auto iter = std::upper_bound(beginIter, rowSizePrefixSum_.end(), remainingBytes + bytesWritten);
return iter - beginIter;
return (facebook::velox::vector_size_t)(iter - beginIter);
}

void VeloxSortShuffleWriter::acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired) {
Expand Down
9 changes: 7 additions & 2 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

arrow::Status insert(const facebook::velox::RowVectorPtr& vector, int64_t memLimit);

void insertRows(facebook::velox::row::CompactRow& row, uint32_t offset, uint32_t rows);
void insertRows(
facebook::velox::row::CompactRow& compact,
facebook::velox::vector_size_t offset,
facebook::velox::vector_size_t size);

arrow::Status maybeSpill(uint32_t nextRows);

Expand All @@ -79,7 +82,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

arrow::Status evictPartition0(uint32_t partitionId, int32_t numRows, uint8_t* buffer, int64_t rawLength);

uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);
facebook::velox::vector_size_t maxRowsToInsert(
facebook::velox::vector_size_t offset,
facebook::velox::vector_size_t remainingRows);

void acquireNewBuffer(uint64_t memLimit, uint64_t minSizeRequired);

Expand Down

0 comments on commit 9128319

Please sign in to comment.