Skip to content

Commit

Permalink
CompactRow support serialize a row range
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Aug 12, 2024
1 parent ad1f393 commit e3996d0
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 55 deletions.
4 changes: 4 additions & 0 deletions velox/row/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ velox_link_libraries(velox_row_fast PUBLIC velox_vector)
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif()

if(${VELOX_ENABLE_BENCHMARKS})
add_subdirectory(benchmark)
endif()
222 changes: 207 additions & 15 deletions velox/row/CompactRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,153 @@
#include "velox/vector/FlatVector.h"

namespace facebook::velox::row {
namespace {
constexpr size_t kSizeBytes = sizeof(int32_t);

void writeInt32(char* buffer, int32_t n) {
memcpy(buffer, &n, sizeof(int32_t));
}

int32_t readInt32(const char* buffer) {
int32_t n;
memcpy(&n, buffer, sizeof(int32_t));
return n;
}

template <TypeKind kind>
void serializeTyped(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();
const auto* rawData = decoded.data<char>();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write fixed-width value.
memcpy(
buffer + offsets[i],
rawData + decoded.index(rows[i]) * valueBytes,
valueBytes);
}
offsets[i] += valueBytes;
}
}

template <>
void serializeTyped<TypeKind::UNKNOWN>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
for (auto i = 0; i < rows.size(); ++i) {
bits::setBit(nulls[i], childIdx, true);
}
}

template <>
void serializeTyped<TypeKind::BOOLEAN>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write fixed-width value.
reinterpret_cast<bool*>(buffer)[offsets[i]] =
decoded.valueAt<bool>(rows[i]);
}
offsets[i] += valueBytes;
}
}

template <>
void serializeTyped<TypeKind::TIMESTAMP>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();
const auto* rawData = decoded.data<Timestamp>();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write fixed-width value.
auto micros = rawData[rows[i]].toMicros();
memcpy(buffer + offsets[i], &micros, sizeof(int64_t));
}
offsets[i] += valueBytes;
}
}

void serializeVarcharAndVarBinary(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
auto value = decoded.valueAt<StringView>(rows[i]);
writeInt32(buffer + offsets[i], value.size());
if (!value.empty()) {
memcpy(buffer + offsets[i] + kSizeBytes, value.data(), value.size());
}
offsets[i] += kSizeBytes + value.size();
}
}
}

template <>
void serializeTyped<TypeKind::VARCHAR>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
serializeVarcharAndVarBinary(rows, childIdx, decoded, nulls, buffer, offsets);
}

template <>
void serializeTyped<TypeKind::VARBINARY>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
serializeVarcharAndVarBinary(rows, childIdx, decoded, nulls, buffer, offsets);
}
} // namespace

CompactRow::CompactRow(const RowVectorPtr& vector)
: typeKind_{vector->typeKind()}, decoded_{*vector} {
Expand Down Expand Up @@ -184,6 +331,59 @@ int32_t CompactRow::serializeRow(vector_size_t index, char* buffer) {
return valuesOffset;
}

void CompactRow::serializeRow(
const IndexRange& indexRange,
char* buffer,
std::vector<size_t>& offsets) {
VELOX_CHECK_EQ(offsets.size(), indexRange.size);

raw_vector<vector_size_t> rows(indexRange.size);
raw_vector<uint8_t*> nulls(indexRange.size);
if (decoded_.isIdentityMapping()) {
std::iota(rows.begin(), rows.end(), indexRange.begin);
} else {
for (auto i = 0; i < indexRange.size; ++i) {
rows[i] = decoded_.index(indexRange.begin + i);
}
}

auto* base = reinterpret_cast<uint8_t*>(buffer);
for (auto i = 0; i < indexRange.size; ++i) {
nulls[i] = base + offsets[i];
offsets[i] += rowNullBytes_;
}

for (auto childIdx = 0; childIdx < children_.size(); ++childIdx) {
auto& child = children_[childIdx];
if (childIsFixedWidth_[childIdx] ||
child.typeKind_ == TypeKind::VARBINARY ||
child.typeKind_ == TypeKind::VARCHAR) {
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
serializeTyped,
child.typeKind_,
rows,
childIdx,
child.decoded_,
child.valueBytes_,
nulls,
buffer,
offsets);
} else {
auto mayHaveNulls = child.decoded_.mayHaveNulls();
for (auto i = 0; i < indexRange.size; ++i) {
if (mayHaveNulls && child.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write non-null variable-width value.
auto size =
child.serializeVariableWidth(rows[i], buffer + offsets[i]);
offsets[i] += size;
}
}
}
}
}

bool CompactRow::isNullAt(vector_size_t index) {
return decoded_.isNullAt(index);
}
Expand Down Expand Up @@ -281,21 +481,6 @@ int32_t CompactRow::serializeArray(vector_size_t index, char* buffer) {
children_[0], offset, size, childIsFixedWidth_[0], buffer);
}

namespace {

constexpr size_t kSizeBytes = sizeof(int32_t);

void writeInt32(char* buffer, int32_t n) {
memcpy(buffer, &n, sizeof(int32_t));
}

int32_t readInt32(const char* buffer) {
int32_t n;
memcpy(&n, buffer, sizeof(int32_t));
return n;
}
} // namespace

int32_t CompactRow::serializeAsArray(
CompactRow& elements,
vector_size_t offset,
Expand Down Expand Up @@ -420,6 +605,13 @@ int32_t CompactRow::serialize(vector_size_t index, char* buffer) {
return serializeRow(index, buffer);
}

void CompactRow::serialize(
const IndexRange& indexRange,
char* buffer,
std::vector<size_t>& offsets) {
return serializeRow(indexRange, buffer, offsets);
}

void CompactRow::serializeFixedWidth(vector_size_t index, char* buffer) {
VELOX_DCHECK(fixedWidthTypeKind_);
switch (typeKind_) {
Expand Down
16 changes: 16 additions & 0 deletions velox/row/CompactRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "velox/vector/ComplexVector.h"
#include "velox/vector/DecodedVector.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::row {

Expand All @@ -36,6 +37,15 @@ class CompactRow {
/// 'buffer' must have sufficient capacity and set to all zeros.
int32_t serialize(vector_size_t index, char* buffer);

/// Serializes rows at specified index range into 'buffer' at given offset.
/// 'buffer' must have sufficient capacity and set to all zeros.
/// The size of 'offsets' and indexRange must be the same.
/// The value of 'offsets' will be updated by the actual bytes written.
void serialize(
const IndexRange& indexRange,
char* buffer,
std::vector<size_t>& offsets);

/// Deserializes multiple rows into a RowVector of specified type. The type
/// must match the contents of the serialized rows.
static RowVectorPtr deserialize(
Expand Down Expand Up @@ -108,6 +118,12 @@ class CompactRow {
/// Serializes struct value to buffer. Value must not be null.
int32_t serializeRow(vector_size_t index, char* buffer);

/// Serializes struct value to buffer. Value must not be null.
void serializeRow(
const IndexRange& indexRange,
char* buffer,
std::vector<size_t>& offsets);

const TypeKind typeKind_;
DecodedVector decoded_;

Expand Down
23 changes: 23 additions & 0 deletions velox/row/benchmark/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_executable(velox_unsafe_row_serialize_benchmark UnsafeRowSerializeBenchmark.cpp)
target_link_libraries(
velox_unsafe_row_serialize_benchmark
PRIVATE
velox_exec
velox_row_fast
velox_vector_fuzzer
Folly::folly
${FOLLY_BENCHMARK})
Loading

0 comments on commit e3996d0

Please sign in to comment.