Skip to content

Commit

Permalink
support range serialization for UnsafeRowFast
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Oct 9, 2024
1 parent acd5717 commit eb8a021
Show file tree
Hide file tree
Showing 3 changed files with 370 additions and 7 deletions.
312 changes: 311 additions & 1 deletion velox/row/UnsafeRowFast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/row/UnsafeRowFast.h"
#include "velox/common/base/RawVector.h"

namespace facebook::velox::row {

Expand All @@ -31,6 +32,228 @@ int32_t alignBytes(int32_t numBytes) {
bool isFixedWidth(const TypePtr& type) {
return type->isFixedWidth() && !type->isLongDecimal();
}

FOLLY_ALWAYS_INLINE void writeFixedWidth(
char* buffer,
const char* rawData,
vector_size_t index,
size_t valueBytes) {
memcpy(buffer, rawData + index * valueBytes, valueBytes);
}

FOLLY_ALWAYS_INLINE void writeTimestamp(
char* buffer,
const Timestamp& timestamp) {
// Write micros(int64_t) for timestamp value.
const auto micros = timestamp.toMicros();
memcpy(buffer, &micros, sizeof(int64_t));
}

FOLLY_ALWAYS_INLINE void writeString(
char* buffer,
char* rowBase,
size_t& variableWidthOffset,
const StringView& value) {
uint64_t sizeAndOffset = variableWidthOffset << 32 | value.size();
*reinterpret_cast<uint64_t*>(buffer) = sizeAndOffset;

if (!value.empty()) {
memcpy(rowBase + variableWidthOffset, value.data(), value.size());
variableWidthOffset += alignBytes(value.size());
}
}

FOLLY_ALWAYS_INLINE void writeLongDecimal(
char* buffer,
char* rowBase,
size_t& variableWidthOffset,
const int128_t& value) {
auto serializedLength =
DecimalUtil::toByteArray(value, rowBase + variableWidthOffset);
uint64_t sizeAndOffset = variableWidthOffset << 32 | serializedLength;
*reinterpret_cast<uint64_t*>(buffer) = sizeAndOffset;
variableWidthOffset += alignBytes(serializedLength);
}

// Serialize a child vector of a row type within a list of rows.
// Write the serialized data at offsets of buffer row by row.
// Update offsets with the actual serialized size.
template <TypeKind kind>
void serializeTyped(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<char*>& nulls,
const raw_vector<char*>& data,
std::vector<size_t>& /*unused*/) {
const auto* rawData = decoded.data<char>();
const auto childOffset = childIdx * kFieldWidth;
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
writeFixedWidth(
data[i] + childOffset, rawData, decoded.index(rows[i]), valueBytes);
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
writeFixedWidth(
data[i] + childOffset, rawData, decoded.index(rows[i]), valueBytes);
}
}
}
}

template <>
void serializeTyped<TypeKind::UNKNOWN>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& /* unused */,
size_t /* unused */,
const raw_vector<char*>& nulls,
const raw_vector<char*>& /*unused*/,
std::vector<size_t>& /*unused*/) {
for (auto i = 0; i < rows.size(); ++i) {
bits::setBit(nulls[i], childIdx, true);
}
}

template <>
void serializeTyped<TypeKind::BOOLEAN>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t /* unused */,
const raw_vector<char*>& nulls,
const raw_vector<char*>& data,
std::vector<size_t>& /*unused*/) {
const auto childOffset = childIdx * kFieldWidth;
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
*reinterpret_cast<bool*>(data[i] + childOffset) =
decoded.valueAt<bool>(rows[i]);
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write 1 byte for bool type.
*reinterpret_cast<bool*>(data[i] + childOffset) =
decoded.valueAt<bool>(rows[i]);
}
}
}
}

template <>
void serializeTyped<TypeKind::TIMESTAMP>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t /* unused */,
const raw_vector<char*>& nulls,
const raw_vector<char*>& data,
std::vector<size_t>& /*unused*/) {
const auto childOffset = childIdx * kFieldWidth;
const auto* rawData = decoded.data<Timestamp>();
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
auto index = decoded.index(rows[i]);
writeTimestamp(data[i] + childOffset, rawData[index]);
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
auto index = decoded.index(rows[i]);
writeTimestamp(data[i] + childOffset, rawData[index]);
}
}
}
}

template <>
void serializeTyped<TypeKind::VARCHAR>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t /*unused*/,
const raw_vector<char*>& nulls,
const raw_vector<char*>& data,
std::vector<size_t>& variableWidthOffsets) {
const auto childOffset = childIdx * kFieldWidth;
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
writeString(
data[i] + childOffset,
nulls[i],
variableWidthOffsets[i],
decoded.valueAt<StringView>(rows[i]));
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
writeString(
data[i] + childOffset,
nulls[i],
variableWidthOffsets[i],
decoded.valueAt<StringView>(rows[i]));
}
}
}
}

template <>
void serializeTyped<TypeKind::VARBINARY>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t valueBytes,
const raw_vector<char*>& nulls,
const raw_vector<char*>& data,
std::vector<size_t>& variableWidthOffsets) {
serializeTyped<TypeKind::VARCHAR>(
rows, childIdx, decoded, valueBytes, nulls, data, variableWidthOffsets);
}

template <>
void serializeTyped<TypeKind::HUGEINT>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
size_t /*unused*/,
const raw_vector<char*>& nulls,
const raw_vector<char*>& data,
std::vector<size_t>& variableWidthOffsets) {
const auto childOffset = childIdx * kFieldWidth;
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
writeLongDecimal(
data[i] + childOffset,
nulls[i],
variableWidthOffsets[i],
decoded.valueAt<int128_t>(rows[i]));
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
writeLongDecimal(
data[i] + childOffset,
nulls[i],
variableWidthOffsets[i],
decoded.valueAt<int128_t>(rows[i]));
}
}
}
}
} // namespace

// static
Expand Down Expand Up @@ -78,7 +301,11 @@ void UnsafeRowFast::initialize(const TypePtr& type) {
auto rowBase = base->as<RowVector>();
for (const auto& child : rowBase->children()) {
children_.push_back(UnsafeRowFast(child));
childIsFixedWidth_.push_back(isFixedWidth(child->type()));
const auto childIsFixedWidth = isFixedWidth(child->type());
childIsFixedWidth_.push_back(childIsFixedWidth);
if (!childIsFixedWidth) {
hasVariableWidth_ = true;
}
}

rowNullBytes_ = alignBits(type->size());
Expand Down Expand Up @@ -155,6 +382,18 @@ int32_t UnsafeRowFast::serialize(vector_size_t index, char* buffer) {
return serializeRow(index, buffer);
}

void UnsafeRowFast::serialize(
vector_size_t offset,
vector_size_t size,
char* buffer,
const size_t* bufferOffsets) {
if (size == 1) {
(void)serializeRow(offset, buffer + *bufferOffsets);
return;
}
return serializeRow(offset, size, buffer, bufferOffsets);
}

void UnsafeRowFast::serializeFixedWidth(vector_size_t index, char* buffer) {
VELOX_DCHECK(fixedWidthTypeKind_);
switch (typeKind_) {
Expand Down Expand Up @@ -408,4 +647,75 @@ int32_t UnsafeRowFast::serializeRow(vector_size_t index, char* buffer) {

return variableWidthOffset;
}

void UnsafeRowFast::serializeRow(
vector_size_t offset,
vector_size_t size,
char* buffer,
const size_t* bufferOffsets) {
raw_vector<vector_size_t> rows(size);
raw_vector<char*> nulls(size);
raw_vector<char*> data(size);
if (decoded_.isIdentityMapping()) {
std::iota(rows.begin(), rows.end(), offset);
} else {
for (auto i = 0; i < size; ++i) {
rows[i] = decoded_.index(offset + i);
}
}

// After serializing variable-width column, the 'variableWidthOffsets' are
// updated accordingly.
std::vector<size_t> variableWidthOffsets;
if (hasVariableWidth_) {
variableWidthOffsets.resize(size);
}

const size_t fixedFieldLength = kFieldWidth * children_.size();
for (auto i = 0; i < size; ++i) {
nulls[i] = buffer + bufferOffsets[i];
data[i] = buffer + bufferOffsets[i] + rowNullBytes_;
if (hasVariableWidth_) {
variableWidthOffsets[i] = rowNullBytes_ + fixedFieldLength;
}
}

// Fixed-width and varchar/varbinary types are serialized using the vectorized
// API 'serializedTyped'. Other data types are serialized row-by-row.
for (auto childIdx = 0; childIdx < children_.size(); ++childIdx) {
auto& child = children_[childIdx];
if (childIsFixedWidth_[childIdx] || child.typeKind_ == TypeKind::HUGEINT ||
child.typeKind_ == TypeKind::VARBINARY ||
child.typeKind_ == TypeKind::VARCHAR) {
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
serializeTyped,
child.typeKind_,
rows,
childIdx,
child.decoded_,
child.valueBytes_,
nulls,
data,
variableWidthOffsets);
} else {
const auto mayHaveNulls = child.decoded_.mayHaveNulls();
const auto childOffset = childIdx * kFieldWidth;
for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && child.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write non-null variable-width value.
auto size = child.serializeVariableWidth(
rows[i], nulls[i] + variableWidthOffsets[i]);
// Write size and offset.
uint64_t sizeAndOffset = variableWidthOffsets[i] << 32 | size;
*reinterpret_cast<uint64_t*>(data[i] + childOffset) = sizeAndOffset;

variableWidthOffsets[i] += alignBytes(size);
}
}
}
}
}

} // namespace facebook::velox::row
23 changes: 23 additions & 0 deletions velox/row/UnsafeRowFast.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ class UnsafeRowFast {
/// 'buffer' must have sufficient capacity and set to all zeros.
int32_t serialize(vector_size_t index, char* buffer);

/// Serializes rows in the range [offset, offset + size) into 'buffer' at
/// given 'bufferOffsets'. 'buffer' must have sufficient capacity and set to
/// all zeros for null-bits handling. 'bufferOffsets' must be pre-filled with
/// the write offsets for each row and must be accessible for 'size' elements.
/// The caller must ensure that the space between each offset in
/// 'bufferOffsets' is no less than the 'fixedRowSize' or 'rowSize'.
void serialize(
vector_size_t offset,
vector_size_t size,
char* buffer,
const size_t* bufferOffsets);

protected:
explicit UnsafeRowFast(const VectorPtr& vector);

Expand Down Expand Up @@ -101,6 +113,14 @@ class UnsafeRowFast {
/// Serializes struct value to buffer. Value must not be null.
int32_t serializeRow(vector_size_t index, char* buffer);

/// Serializes struct values in range [offset, offset + size) to buffer.
/// Value must not be null.
void serializeRow(
vector_size_t offset,
vector_size_t size,
char* buffer,
const size_t* bufferOffsets);

const TypeKind typeKind_;
DecodedVector decoded_;

Expand All @@ -120,5 +140,8 @@ class UnsafeRowFast {

// Fixed-width types only. Number of bytes used for a single value.
size_t valueBytes_;

// ROW type only. True if children have variable-width type.
bool hasVariableWidth_{false};
};
} // namespace facebook::velox::row
Loading

0 comments on commit eb8a021

Please sign in to comment.