Skip to content

Commit

Permalink
special judge for single row
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Sep 6, 2024
1 parent 25086c9 commit e018f54
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 34 deletions.
16 changes: 9 additions & 7 deletions velox/row/CompactRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,7 @@ void CompactRow::serializeRow(
vector_size_t offset,
vector_size_t size,
char* buffer,
const std::vector<size_t>& bufferOffsets) {
VELOX_CHECK_EQ(bufferOffsets.size(), size);

const size_t* bufferOffsets) {
raw_vector<vector_size_t> rows(size);
raw_vector<uint8_t*> nulls(size);
if (decoded_.isIdentityMapping()) {
Expand All @@ -379,11 +377,11 @@ void CompactRow::serializeRow(
}

// After serializing each column, the 'offsets' are updated accordingly.
std::vector<size_t> offsets = bufferOffsets;
std::vector<size_t> offsets(size);
auto* base = reinterpret_cast<uint8_t*>(buffer);
for (auto i = 0; i < size; ++i) {
nulls[i] = base + offsets[i];
offsets[i] += rowNullBytes_;
nulls[i] = base + bufferOffsets[i];
offsets[i] = bufferOffsets[i] + rowNullBytes_;
}

// Fixed-width and varchar/varbinary types are serialized using the vectorized
Expand Down Expand Up @@ -644,7 +642,11 @@ void CompactRow::serialize(
vector_size_t offset,
vector_size_t size,
char* buffer,
const std::vector<size_t>& bufferOffsets) {
const size_t* bufferOffsets) {
if (size == 1) {
(void)serializeRow(offset, buffer + *bufferOffsets);
return;
}
return serializeRow(offset, size, buffer, bufferOffsets);
}

Expand Down
4 changes: 2 additions & 2 deletions velox/row/CompactRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class CompactRow {
vector_size_t offset,
vector_size_t size,
char* buffer,
const std::vector<size_t>& bufferOffsets);
const size_t* bufferOffsets);

/// Deserializes multiple rows into a RowVector of specified type. The type
/// must match the contents of the serialized rows.
Expand Down Expand Up @@ -128,7 +128,7 @@ class CompactRow {
vector_size_t offset,
vector_size_t size,
char* buffer,
const std::vector<size_t>& bufferOffsets);
const size_t* bufferOffsets);

const TypeKind typeKind_;
DecodedVector decoded_;
Expand Down
6 changes: 3 additions & 3 deletions velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ class SerializeBenchmark {
CompactRow& compactRow,
vector_size_t numRows,
BufferPtr& buffer,
std::vector<size_t>& rowSize,
std::vector<size_t>& offsets) {
const std::vector<size_t>& rowSize,
const std::vector<size_t>& offsets) {
auto rawBuffer = buffer->asMutable<char>();
compactRow.serialize(0, numRows, rawBuffer, offsets);
compactRow.serialize(0, numRows, rawBuffer, offsets.data());

std::vector<std::string_view> serialized;
for (auto i = 0; i < numRows; ++i) {
Expand Down
2 changes: 1 addition & 1 deletion velox/row/tests/CompactRowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class CompactRowTest : public ::testing::Test, public VectorTestBase {
memset(rawBuffer, 0, totalSize);
{
std::vector<std::string_view> serialized;
row.serialize(0, numRows, rawBuffer, offsets);
row.serialize(0, numRows, rawBuffer, offsets.data());
for (auto i = 0; i < numRows; ++i) {
serialized.push_back(
std::string_view(rawBuffer + offsets[i], rowSize[i]));
Expand Down
53 changes: 32 additions & 21 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,25 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
const folly::Range<const IndexRange*>& ranges,
Scratch& scratch) override {
size_t totalSize = 0;
std::vector<std::vector<vector_size_t>> rowSize(ranges.size());
auto totalRows = std::accumulate(
ranges.begin(),
ranges.end(),
0,
[](vector_size_t sum, const auto& range) { return sum + range.size; });

row::CompactRow row(vector);
std::vector<vector_size_t> rowSize(totalRows);
if (auto fixedRowSize =
row::CompactRow::fixedRowSize(asRowType(vector->type()))) {
for (auto i = 0; i < ranges.size(); ++i) {
totalSize += (fixedRowSize.value() + sizeof(TRowSize)) * ranges[i].size;
rowSize[i].resize(ranges[i].size, fixedRowSize.value());
}
totalSize += (fixedRowSize.value() + sizeof(TRowSize)) * totalRows;
std::fill(rowSize.begin(), rowSize.end(), fixedRowSize.value());
} else {
for (auto i = 0; i < ranges.size(); ++i) {
const auto& range = ranges[i];
rowSize[i].resize(range.size);
for (auto j = 0; j < range.size; ++j) {
rowSize[i][j] = row.rowSize(range.begin + j);
totalSize += rowSize[i][j] + sizeof(TRowSize);
vector_size_t index = 0;
for (const auto& range : ranges) {
for (auto i = 0; i < range.size; ++i) {
rowSize[index] = row.rowSize(range.begin + i);
totalSize += rowSize[index] + sizeof(TRowSize);
index++;
}
}
}
Expand All @@ -68,17 +72,24 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
buffers_.push_back(std::move(buffer));

size_t offset = 0;
for (auto i = 0; i < ranges.size(); ++i) {
const auto& range = ranges[i];
std::vector<size_t> offsets(range.size);
for (auto j = 0; j < range.size; ++j) {
// Write raw size. Needs to be in big endian order.
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[i][j]);
offsets[j] = offset + sizeof(TRowSize);
offset += rowSize[i][j] + sizeof(TRowSize);
vector_size_t index = 0;
for (const auto& range : ranges) {
if (range.size == 1) {
*(TRowSize*)(rawBuffer) = folly::Endian::big(rowSize[index++]);
static const auto offset = sizeof(TRowSize);
row.serialize(range.begin, range.size, rawBuffer, &offset);
} else {
raw_vector<size_t> offsets(range.size);
for (auto i = 0; i < range.size; ++i) {
// Write raw size. Needs to be in big endian order.
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[index]);
offsets[i] = offset + sizeof(TRowSize);
offset += rowSize[index] + sizeof(TRowSize);
index++;
}
// Write row data for all rows in range.
row.serialize(range.begin, range.size, rawBuffer, offsets.data());
}
// Write row data for all rows in range.
row.serialize(range.begin, range.size, rawBuffer, offsets);
}
}

Expand Down

0 comments on commit e018f54

Please sign in to comment.