From eb8a02116efb78a6a7395f3ee2a04e298f4c56f0 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 9 Oct 2024 11:50:08 +0000 Subject: [PATCH] support range serialization for UnsafeRowFast --- velox/row/UnsafeRowFast.cpp | 312 +++++++++++++++++++++++++- velox/row/UnsafeRowFast.h | 23 ++ velox/row/tests/UnsafeRowFuzzTest.cpp | 42 +++- 3 files changed, 370 insertions(+), 7 deletions(-) diff --git a/velox/row/UnsafeRowFast.cpp b/velox/row/UnsafeRowFast.cpp index 2f07c633f536..552fe82ed045 100644 --- a/velox/row/UnsafeRowFast.cpp +++ b/velox/row/UnsafeRowFast.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/row/UnsafeRowFast.h" +#include "velox/common/base/RawVector.h" namespace facebook::velox::row { @@ -31,6 +32,228 @@ int32_t alignBytes(int32_t numBytes) { bool isFixedWidth(const TypePtr& type) { return type->isFixedWidth() && !type->isLongDecimal(); } + +FOLLY_ALWAYS_INLINE void writeFixedWidth( + char* buffer, + const char* rawData, + vector_size_t index, + size_t valueBytes) { + memcpy(buffer, rawData + index * valueBytes, valueBytes); +} + +FOLLY_ALWAYS_INLINE void writeTimestamp( + char* buffer, + const Timestamp& timestamp) { + // Write micros(int64_t) for timestamp value. + const auto micros = timestamp.toMicros(); + memcpy(buffer, µs, sizeof(int64_t)); +} + +FOLLY_ALWAYS_INLINE void writeString( + char* buffer, + char* rowBase, + size_t& variableWidthOffset, + const StringView& value) { + uint64_t sizeAndOffset = variableWidthOffset << 32 | value.size(); + *reinterpret_cast(buffer) = sizeAndOffset; + + if (!value.empty()) { + memcpy(rowBase + variableWidthOffset, value.data(), value.size()); + variableWidthOffset += alignBytes(value.size()); + } +} + +FOLLY_ALWAYS_INLINE void writeLongDecimal( + char* buffer, + char* rowBase, + size_t& variableWidthOffset, + const int128_t& value) { + auto serializedLength = + DecimalUtil::toByteArray(value, rowBase + variableWidthOffset); + uint64_t sizeAndOffset = variableWidthOffset << 32 | serializedLength; + *reinterpret_cast(buffer) = sizeAndOffset; + variableWidthOffset += alignBytes(serializedLength); +} + +// Serialize a child vector of a row type within a list of rows. +// Write the serialized data at offsets of buffer row by row. +// Update offsets with the actual serialized size. +template +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + const raw_vector& data, + std::vector& /*unused*/) { + const auto* rawData = decoded.data(); + const auto childOffset = childIdx * kFieldWidth; + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + writeFixedWidth( + data[i] + childOffset, rawData, decoded.index(rows[i]), valueBytes); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + writeFixedWidth( + data[i] + childOffset, rawData, decoded.index(rows[i]), valueBytes); + } + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& /* unused */, + size_t /* unused */, + const raw_vector& nulls, + const raw_vector& /*unused*/, + std::vector& /*unused*/) { + for (auto i = 0; i < rows.size(); ++i) { + bits::setBit(nulls[i], childIdx, true); + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t /* unused */, + const raw_vector& nulls, + const raw_vector& data, + std::vector& /*unused*/) { + const auto childOffset = childIdx * kFieldWidth; + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + *reinterpret_cast(data[i] + childOffset) = + decoded.valueAt(rows[i]); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write 1 byte for bool type. + *reinterpret_cast(data[i] + childOffset) = + decoded.valueAt(rows[i]); + } + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t /* unused */, + const raw_vector& nulls, + const raw_vector& data, + std::vector& /*unused*/) { + const auto childOffset = childIdx * kFieldWidth; + const auto* rawData = decoded.data(); + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + auto index = decoded.index(rows[i]); + writeTimestamp(data[i] + childOffset, rawData[index]); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + auto index = decoded.index(rows[i]); + writeTimestamp(data[i] + childOffset, rawData[index]); + } + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t /*unused*/, + const raw_vector& nulls, + const raw_vector& data, + std::vector& variableWidthOffsets) { + const auto childOffset = childIdx * kFieldWidth; + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + writeString( + data[i] + childOffset, + nulls[i], + variableWidthOffsets[i], + decoded.valueAt(rows[i])); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + writeString( + data[i] + childOffset, + nulls[i], + variableWidthOffsets[i], + decoded.valueAt(rows[i])); + } + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + const raw_vector& data, + std::vector& variableWidthOffsets) { + serializeTyped( + rows, childIdx, decoded, valueBytes, nulls, data, variableWidthOffsets); +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t /*unused*/, + const raw_vector& nulls, + const raw_vector& data, + std::vector& variableWidthOffsets) { + const auto childOffset = childIdx * kFieldWidth; + if (!decoded.mayHaveNulls()) { + for (auto i = 0; i < rows.size(); ++i) { + writeLongDecimal( + data[i] + childOffset, + nulls[i], + variableWidthOffsets[i], + decoded.valueAt(rows[i])); + } + } else { + for (auto i = 0; i < rows.size(); ++i) { + if (decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + writeLongDecimal( + data[i] + childOffset, + nulls[i], + variableWidthOffsets[i], + decoded.valueAt(rows[i])); + } + } + } +} } // namespace // static @@ -78,7 +301,11 @@ void UnsafeRowFast::initialize(const TypePtr& type) { auto rowBase = base->as(); for (const auto& child : rowBase->children()) { children_.push_back(UnsafeRowFast(child)); - childIsFixedWidth_.push_back(isFixedWidth(child->type())); + const auto childIsFixedWidth = isFixedWidth(child->type()); + childIsFixedWidth_.push_back(childIsFixedWidth); + if (!childIsFixedWidth) { + hasVariableWidth_ = true; + } } rowNullBytes_ = alignBits(type->size()); @@ -155,6 +382,18 @@ int32_t UnsafeRowFast::serialize(vector_size_t index, char* buffer) { return serializeRow(index, buffer); } +void UnsafeRowFast::serialize( + vector_size_t offset, + vector_size_t size, + char* buffer, + const size_t* bufferOffsets) { + if (size == 1) { + (void)serializeRow(offset, buffer + *bufferOffsets); + return; + } + return serializeRow(offset, size, buffer, bufferOffsets); +} + void UnsafeRowFast::serializeFixedWidth(vector_size_t index, char* buffer) { VELOX_DCHECK(fixedWidthTypeKind_); switch (typeKind_) { @@ -408,4 +647,75 @@ int32_t UnsafeRowFast::serializeRow(vector_size_t index, char* buffer) { return variableWidthOffset; } + +void UnsafeRowFast::serializeRow( + vector_size_t offset, + vector_size_t size, + char* buffer, + const size_t* bufferOffsets) { + raw_vector rows(size); + raw_vector nulls(size); + raw_vector data(size); + if (decoded_.isIdentityMapping()) { + std::iota(rows.begin(), rows.end(), offset); + } else { + for (auto i = 0; i < size; ++i) { + rows[i] = decoded_.index(offset + i); + } + } + + // After serializing variable-width column, the 'variableWidthOffsets' are + // updated accordingly. + std::vector variableWidthOffsets; + if (hasVariableWidth_) { + variableWidthOffsets.resize(size); + } + + const size_t fixedFieldLength = kFieldWidth * children_.size(); + for (auto i = 0; i < size; ++i) { + nulls[i] = buffer + bufferOffsets[i]; + data[i] = buffer + bufferOffsets[i] + rowNullBytes_; + if (hasVariableWidth_) { + variableWidthOffsets[i] = rowNullBytes_ + fixedFieldLength; + } + } + + // Fixed-width and varchar/varbinary types are serialized using the vectorized + // API 'serializedTyped'. Other data types are serialized row-by-row. + for (auto childIdx = 0; childIdx < children_.size(); ++childIdx) { + auto& child = children_[childIdx]; + if (childIsFixedWidth_[childIdx] || child.typeKind_ == TypeKind::HUGEINT || + child.typeKind_ == TypeKind::VARBINARY || + child.typeKind_ == TypeKind::VARCHAR) { + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + serializeTyped, + child.typeKind_, + rows, + childIdx, + child.decoded_, + child.valueBytes_, + nulls, + data, + variableWidthOffsets); + } else { + const auto mayHaveNulls = child.decoded_.mayHaveNulls(); + const auto childOffset = childIdx * kFieldWidth; + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && child.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write non-null variable-width value. + auto size = child.serializeVariableWidth( + rows[i], nulls[i] + variableWidthOffsets[i]); + // Write size and offset. + uint64_t sizeAndOffset = variableWidthOffsets[i] << 32 | size; + *reinterpret_cast(data[i] + childOffset) = sizeAndOffset; + + variableWidthOffsets[i] += alignBytes(size); + } + } + } + } +} + } // namespace facebook::velox::row diff --git a/velox/row/UnsafeRowFast.h b/velox/row/UnsafeRowFast.h index e82a83912cf4..111b75c363a8 100644 --- a/velox/row/UnsafeRowFast.h +++ b/velox/row/UnsafeRowFast.h @@ -36,6 +36,18 @@ class UnsafeRowFast { /// 'buffer' must have sufficient capacity and set to all zeros. int32_t serialize(vector_size_t index, char* buffer); + /// Serializes rows in the range [offset, offset + size) into 'buffer' at + /// given 'bufferOffsets'. 'buffer' must have sufficient capacity and set to + /// all zeros for null-bits handling. 'bufferOffsets' must be pre-filled with + /// the write offsets for each row and must be accessible for 'size' elements. + /// The caller must ensure that the space between each offset in + /// 'bufferOffsets' is no less than the 'fixedRowSize' or 'rowSize'. + void serialize( + vector_size_t offset, + vector_size_t size, + char* buffer, + const size_t* bufferOffsets); + protected: explicit UnsafeRowFast(const VectorPtr& vector); @@ -101,6 +113,14 @@ class UnsafeRowFast { /// Serializes struct value to buffer. Value must not be null. int32_t serializeRow(vector_size_t index, char* buffer); + /// Serializes struct values in range [offset, offset + size) to buffer. + /// Value must not be null. + void serializeRow( + vector_size_t offset, + vector_size_t size, + char* buffer, + const size_t* bufferOffsets); + const TypeKind typeKind_; DecodedVector decoded_; @@ -120,5 +140,8 @@ class UnsafeRowFast { // Fixed-width types only. Number of bytes used for a single value. size_t valueBytes_; + + // ROW type only. True if children have variable-width type. + bool hasVariableWidth_{false}; }; } // namespace facebook::velox::row diff --git a/velox/row/tests/UnsafeRowFuzzTest.cpp b/velox/row/tests/UnsafeRowFuzzTest.cpp index 8ce4ce74eb75..ae865f80371e 100644 --- a/velox/row/tests/UnsafeRowFuzzTest.cpp +++ b/velox/row/tests/UnsafeRowFuzzTest.cpp @@ -95,6 +95,8 @@ class UnsafeRowFuzzTests : public ::testing::Test { std::shared_ptr pool_ = memory::memoryManager()->addLeafPool(); + + BufferPtr buffer_; }; TEST_F(UnsafeRowFuzzTests, fast) { @@ -164,16 +166,44 @@ TEST_F(UnsafeRowFuzzTests, fast) { doTest(rowType, [&](const RowVectorPtr& data) { std::vector> serialized; - serialized.reserve(data->size()); + const auto numRows = data->size(); + serialized.reserve(numRows); + std::vector rowSize(numRows); + std::vector offsets(numRows); UnsafeRowFast fast(data); - for (auto i = 0; i < data->size(); ++i) { - auto rowSize = fast.serialize(i, buffers_[i]); - VELOX_CHECK_LE(rowSize, kBufferSize); - EXPECT_EQ(rowSize, fast.rowSize(i)) << i << ", " << data->toString(i); + size_t totalSize = 0; + if (auto fixedRowSize = UnsafeRowFast::fixedRowSize(rowType)) { + totalSize = fixedRowSize.value() * numRows; + for (auto i = 0; i < numRows; ++i) { + rowSize[i] = fixedRowSize.value(); + offsets[i] = fixedRowSize.value() * i; + } + } else { + for (auto i = 0; i < numRows; ++i) { + rowSize[i] = fast.rowSize(i); + offsets[i] = totalSize; + totalSize += rowSize[i]; + } + } + + buffer_ = AlignedBuffer::allocate(totalSize, pool_.get(), 0); + auto* rawBuffer = buffer_->asMutable(); + + vector_size_t offset = 0; + vector_size_t rangeSize = 1; + // Serialize with different range size. + while (offset < numRows) { + auto size = std::min(rangeSize, numRows - offset); + fast.serialize(offset, size, rawBuffer, offsets.data() + offset); + offset += size; + rangeSize = checkedMultiply(rangeSize, 2); + } - serialized.push_back(std::string_view(buffers_[i], rowSize)); + for (auto i = 0; i < numRows; ++i) { + serialized.push_back( + std::string_view(rawBuffer + offsets[i], rowSize[i])); } return serialized; });