Skip to content

Commit

Permalink
CompactRow support serialize a row range
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Aug 29, 2024
1 parent c005c6b commit 77013d9
Show file tree
Hide file tree
Showing 8 changed files with 384 additions and 57 deletions.
3 changes: 3 additions & 0 deletions velox/docs/develop/serde/compactrow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ TIMESTAMP 8
UNKNOWN 0
================ ==============================================

Timestamps are serialized with microsecond precision to align with Spark's
handling of timestamps.

Strings (VARCHAR and VARBINARY) use 4 bytes for size plus the length of the
string. Empty string uses 4 bytes. 1-character string uses 5 bytes.
20-character ASCII string uses 24 bytes. Null strings do not take up space
Expand Down
4 changes: 4 additions & 0 deletions velox/row/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ velox_link_libraries(velox_row_fast PUBLIC velox_vector)
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()

if(${VELOX_ENABLE_BENCHMARKS})
add_subdirectory(benchmarks)
endif()
220 changes: 205 additions & 15 deletions velox/row/CompactRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,147 @@
#include "velox/vector/FlatVector.h"

namespace facebook::velox::row {
namespace {
constexpr size_t kSizeBytes = sizeof(int32_t);

void writeInt32(char* buffer, int32_t n) {
memcpy(buffer, &n, kSizeBytes);
}

int32_t readInt32(const char* buffer) {
int32_t n;
memcpy(&n, buffer, kSizeBytes);
return n;
}

// Serialize a child vector of a row type within a list of rows.
// Write the serialized data at offsets of buffer row by row.
// Update offsets with the actual serialized size.
template <TypeKind kind>
void serializeTyped(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();
const auto* rawData = decoded.data<char>();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write fixed-width value.
memcpy(
buffer + offsets[i],
rawData + decoded.index(rows[i]) * valueBytes,
valueBytes);
}
offsets[i] += valueBytes;
}
}

template <>
void serializeTyped<TypeKind::UNKNOWN>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& /* unused */,
size_t /* unused */,
const raw_vector<uint8_t*>& nulls,
char* /* unused */,
std::vector<size_t>& /* unused */) {
for (auto i = 0; i < rows.size(); ++i) {
bits::setBit(nulls[i], childIdx, true);
}
}

template <>
void serializeTyped<TypeKind::BOOLEAN>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t /* unused */,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();
auto* byte = reinterpret_cast<bool*>(buffer);

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write 1 byte for bool type.
byte[offsets[i]] = decoded.valueAt<bool>(rows[i]);
}
offsets[i] += 1;
}
}

template <>
void serializeTyped<TypeKind::TIMESTAMP>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t /* unused */,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();
const auto* rawData = decoded.data<Timestamp>();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write micros(int64_t) for timestamp value.
auto micros = rawData[rows[i]].toMicros();
memcpy(buffer + offsets[i], &micros, sizeof(int64_t));
}
offsets[i] += sizeof(int64_t);
}
}

template <>
void serializeTyped<TypeKind::VARCHAR>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
auto value = decoded.valueAt<StringView>(rows[i]);
writeInt32(buffer + offsets[i], value.size());
if (!value.empty()) {
memcpy(buffer + offsets[i] + kSizeBytes, value.data(), value.size());
}
offsets[i] += kSizeBytes + value.size();
}
}
}

template <>
void serializeTyped<TypeKind::VARBINARY>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
serializeTyped<TypeKind::VARCHAR>(
rows, childIdx, decoded, valueBytes, nulls, buffer, offsets);
}
} // namespace

CompactRow::CompactRow(const RowVectorPtr& vector)
: typeKind_{vector->typeKind()}, decoded_{*vector} {
Expand Down Expand Up @@ -184,6 +325,62 @@ int32_t CompactRow::serializeRow(vector_size_t index, char* buffer) {
return valuesOffset;
}

void CompactRow::serializeRow(
vector_size_t offset,
vector_size_t size,
char* buffer,
const std::vector<size_t>& bufferOffsets) {
VELOX_CHECK_EQ(bufferOffsets.size(), size);

raw_vector<vector_size_t> rows(size);
raw_vector<uint8_t*> nulls(size);
if (decoded_.isIdentityMapping()) {
std::iota(rows.begin(), rows.end(), offset);
} else {
for (auto i = 0; i < size; ++i) {
rows[i] = decoded_.index(offset + i);
}
}

// After serializing each column, the 'offsets' are updated accordingly.
std::vector<size_t> offsets = bufferOffsets;
auto* base = reinterpret_cast<uint8_t*>(buffer);
for (auto i = 0; i < size; ++i) {
nulls[i] = base + offsets[i];
offsets[i] += rowNullBytes_;
}

for (auto childIdx = 0; childIdx < children_.size(); ++childIdx) {
auto& child = children_[childIdx];
if (childIsFixedWidth_[childIdx] ||
child.typeKind_ == TypeKind::VARBINARY ||
child.typeKind_ == TypeKind::VARCHAR) {
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
serializeTyped,
child.typeKind_,
rows,
childIdx,
child.decoded_,
child.valueBytes_,
nulls,
buffer,
offsets);
} else {
auto mayHaveNulls = child.decoded_.mayHaveNulls();
for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && child.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write non-null variable-width value.
auto bytes =
child.serializeVariableWidth(rows[i], buffer + offsets[i]);
offsets[i] += bytes;
}
}
}
}
}

bool CompactRow::isNullAt(vector_size_t index) {
return decoded_.isNullAt(index);
}
Expand Down Expand Up @@ -281,21 +478,6 @@ int32_t CompactRow::serializeArray(vector_size_t index, char* buffer) {
children_[0], offset, size, childIsFixedWidth_[0], buffer);
}

namespace {

constexpr size_t kSizeBytes = sizeof(int32_t);

void writeInt32(char* buffer, int32_t n) {
memcpy(buffer, &n, sizeof(int32_t));
}

int32_t readInt32(const char* buffer) {
int32_t n;
memcpy(&n, buffer, sizeof(int32_t));
return n;
}
} // namespace

int32_t CompactRow::serializeAsArray(
CompactRow& elements,
vector_size_t offset,
Expand Down Expand Up @@ -420,6 +602,14 @@ int32_t CompactRow::serialize(vector_size_t index, char* buffer) {
return serializeRow(index, buffer);
}

void CompactRow::serialize(
vector_size_t offset,
vector_size_t size,
char* buffer,
const std::vector<size_t>& bufferOffsets) {
return serializeRow(offset, size, buffer, bufferOffsets);
}

void CompactRow::serializeFixedWidth(vector_size_t index, char* buffer) {
VELOX_DCHECK(fixedWidthTypeKind_);
switch (typeKind_) {
Expand Down
21 changes: 21 additions & 0 deletions velox/row/CompactRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include "velox/common/base/RawVector.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DecodedVector.h"

Expand All @@ -36,6 +37,18 @@ class CompactRow {
/// 'buffer' must have sufficient capacity and set to all zeros.
int32_t serialize(vector_size_t index, char* buffer);

/// Serializes rows in range [offset, offset + size) into 'buffer' at given
/// 'bufferOffsets'. 'buffer' must have sufficient capacity and set to all
/// zeros. 'bufferOffsets' must be pre-filled with the write offsets for each
/// row and must have the same number of elements as the 'size' parameter.
/// The caller must ensure that the space between each offset in
/// 'bufferOffsets' is no less than the 'fixedRowSize' or 'rowSize'.
void serialize(
vector_size_t offset,
vector_size_t size,
char* buffer,
const std::vector<size_t>& bufferOffsets);

/// Deserializes multiple rows into a RowVector of specified type. The type
/// must match the contents of the serialized rows.
static RowVectorPtr deserialize(
Expand Down Expand Up @@ -108,6 +121,14 @@ class CompactRow {
/// Serializes struct value to buffer. Value must not be null.
int32_t serializeRow(vector_size_t index, char* buffer);

/// Serializes struct values in range [offset, offset + size) to buffer.
/// Value must not be null.
void serializeRow(
vector_size_t offset,
vector_size_t size,
char* buffer,
const std::vector<size_t>& bufferOffsets);

const TypeKind typeKind_;
DecodedVector decoded_;

Expand Down
24 changes: 24 additions & 0 deletions velox/row/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_unsafe_row_serialize_benchmark
UnsafeRowSerializeBenchmark.cpp)
target_link_libraries(
velox_unsafe_row_serialize_benchmark
PRIVATE
velox_exec
velox_row_fast
velox_vector_fuzzer
Folly::folly
${FOLLY_BENCHMARK})
Loading

0 comments on commit 77013d9

Please sign in to comment.