From e3996d0932af3716d56dfe37cae2bc444d29703e Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 9 Aug 2024 07:07:05 +0000 Subject: [PATCH] CompactRow support serialize a row range --- velox/row/CMakeLists.txt | 4 + velox/row/CompactRow.cpp | 222 ++++++++++++++++-- velox/row/CompactRow.h | 16 ++ velox/row/benchmark/CMakeLists.txt | 23 ++ .../benchmark/UnsafeRowSerializeBenchmark.cpp | 117 ++++++--- velox/row/tests/CompactRowTest.cpp | 46 +++- 6 files changed, 373 insertions(+), 55 deletions(-) create mode 100644 velox/row/benchmark/CMakeLists.txt diff --git a/velox/row/CMakeLists.txt b/velox/row/CMakeLists.txt index 9eb9cb524682e..b08369026f72b 100644 --- a/velox/row/CMakeLists.txt +++ b/velox/row/CMakeLists.txt @@ -19,3 +19,7 @@ velox_link_libraries(velox_row_fast PUBLIC velox_vector) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() + +if(${VELOX_ENABLE_BENCHMARKS}) + add_subdirectory(benchmark) +endif() diff --git a/velox/row/CompactRow.cpp b/velox/row/CompactRow.cpp index e692e490df2bf..d35433a34ecc6 100644 --- a/velox/row/CompactRow.cpp +++ b/velox/row/CompactRow.cpp @@ -17,6 +17,153 @@ #include "velox/vector/FlatVector.h" namespace facebook::velox::row { +namespace { +constexpr size_t kSizeBytes = sizeof(int32_t); + +void writeInt32(char* buffer, int32_t n) { + memcpy(buffer, &n, sizeof(int32_t)); +} + +int32_t readInt32(const char* buffer) { + int32_t n; + memcpy(&n, buffer, sizeof(int32_t)); + return n; +} + +template +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto mayHaveNulls = decoded.mayHaveNulls(); + const auto* rawData = decoded.data(); + + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write fixed-width value. + memcpy( + buffer + offsets[i], + rawData + decoded.index(rows[i]) * valueBytes, + valueBytes); + } + offsets[i] += valueBytes; + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + for (auto i = 0; i < rows.size(); ++i) { + bits::setBit(nulls[i], childIdx, true); + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto mayHaveNulls = decoded.mayHaveNulls(); + + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write fixed-width value. + reinterpret_cast(buffer)[offsets[i]] = + decoded.valueAt(rows[i]); + } + offsets[i] += valueBytes; + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto mayHaveNulls = decoded.mayHaveNulls(); + const auto* rawData = decoded.data(); + + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write fixed-width value. + auto micros = rawData[rows[i]].toMicros(); + memcpy(buffer + offsets[i], µs, sizeof(int64_t)); + } + offsets[i] += valueBytes; + } +} + +void serializeVarcharAndVarBinary( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto mayHaveNulls = decoded.mayHaveNulls(); + + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + auto value = decoded.valueAt(rows[i]); + writeInt32(buffer + offsets[i], value.size()); + if (!value.empty()) { + memcpy(buffer + offsets[i] + kSizeBytes, value.data(), value.size()); + } + offsets[i] += kSizeBytes + value.size(); + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + serializeVarcharAndVarBinary(rows, childIdx, decoded, nulls, buffer, offsets); +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + serializeVarcharAndVarBinary(rows, childIdx, decoded, nulls, buffer, offsets); +} +} // namespace CompactRow::CompactRow(const RowVectorPtr& vector) : typeKind_{vector->typeKind()}, decoded_{*vector} { @@ -184,6 +331,59 @@ int32_t CompactRow::serializeRow(vector_size_t index, char* buffer) { return valuesOffset; } +void CompactRow::serializeRow( + const IndexRange& indexRange, + char* buffer, + std::vector& offsets) { + VELOX_CHECK_EQ(offsets.size(), indexRange.size); + + raw_vector rows(indexRange.size); + raw_vector nulls(indexRange.size); + if (decoded_.isIdentityMapping()) { + std::iota(rows.begin(), rows.end(), indexRange.begin); + } else { + for (auto i = 0; i < indexRange.size; ++i) { + rows[i] = decoded_.index(indexRange.begin + i); + } + } + + auto* base = reinterpret_cast(buffer); + for (auto i = 0; i < indexRange.size; ++i) { + nulls[i] = base + offsets[i]; + offsets[i] += rowNullBytes_; + } + + for (auto childIdx = 0; childIdx < children_.size(); ++childIdx) { + auto& child = children_[childIdx]; + if (childIsFixedWidth_[childIdx] || + child.typeKind_ == TypeKind::VARBINARY || + child.typeKind_ == TypeKind::VARCHAR) { + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + serializeTyped, + child.typeKind_, + rows, + childIdx, + child.decoded_, + child.valueBytes_, + nulls, + buffer, + offsets); + } else { + auto mayHaveNulls = child.decoded_.mayHaveNulls(); + for (auto i = 0; i < indexRange.size; ++i) { + if (mayHaveNulls && child.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write non-null variable-width value. + auto size = + child.serializeVariableWidth(rows[i], buffer + offsets[i]); + offsets[i] += size; + } + } + } + } +} + bool CompactRow::isNullAt(vector_size_t index) { return decoded_.isNullAt(index); } @@ -281,21 +481,6 @@ int32_t CompactRow::serializeArray(vector_size_t index, char* buffer) { children_[0], offset, size, childIsFixedWidth_[0], buffer); } -namespace { - -constexpr size_t kSizeBytes = sizeof(int32_t); - -void writeInt32(char* buffer, int32_t n) { - memcpy(buffer, &n, sizeof(int32_t)); -} - -int32_t readInt32(const char* buffer) { - int32_t n; - memcpy(&n, buffer, sizeof(int32_t)); - return n; -} -} // namespace - int32_t CompactRow::serializeAsArray( CompactRow& elements, vector_size_t offset, @@ -420,6 +605,13 @@ int32_t CompactRow::serialize(vector_size_t index, char* buffer) { return serializeRow(index, buffer); } +void CompactRow::serialize( + const IndexRange& indexRange, + char* buffer, + std::vector& offsets) { + return serializeRow(indexRange, buffer, offsets); +} + void CompactRow::serializeFixedWidth(vector_size_t index, char* buffer) { VELOX_DCHECK(fixedWidthTypeKind_); switch (typeKind_) { diff --git a/velox/row/CompactRow.h b/velox/row/CompactRow.h index 9abaed0bdee23..205623a86fcc7 100644 --- a/velox/row/CompactRow.h +++ b/velox/row/CompactRow.h @@ -17,6 +17,7 @@ #include "velox/vector/ComplexVector.h" #include "velox/vector/DecodedVector.h" +#include "velox/vector/VectorStream.h" namespace facebook::velox::row { @@ -36,6 +37,15 @@ class CompactRow { /// 'buffer' must have sufficient capacity and set to all zeros. int32_t serialize(vector_size_t index, char* buffer); + /// Serializes rows at specified index range into 'buffer' at given offset. + /// 'buffer' must have sufficient capacity and set to all zeros. + /// The size of 'offsets' and indexRange must be the same. + /// The value of 'offsets' will be updated by the actual bytes written. + void serialize( + const IndexRange& indexRange, + char* buffer, + std::vector& offsets); + /// Deserializes multiple rows into a RowVector of specified type. The type /// must match the contents of the serialized rows. static RowVectorPtr deserialize( @@ -108,6 +118,12 @@ class CompactRow { /// Serializes struct value to buffer. Value must not be null. int32_t serializeRow(vector_size_t index, char* buffer); + /// Serializes struct value to buffer. Value must not be null. + void serializeRow( + const IndexRange& indexRange, + char* buffer, + std::vector& offsets); + const TypeKind typeKind_; DecodedVector decoded_; diff --git a/velox/row/benchmark/CMakeLists.txt b/velox/row/benchmark/CMakeLists.txt new file mode 100644 index 0000000000000..f09a5e1169746 --- /dev/null +++ b/velox/row/benchmark/CMakeLists.txt @@ -0,0 +1,23 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_unsafe_row_serialize_benchmark UnsafeRowSerializeBenchmark.cpp) +target_link_libraries( + velox_unsafe_row_serialize_benchmark + PRIVATE + velox_exec + velox_row_fast + velox_vector_fuzzer + Folly::folly + ${FOLLY_BENCHMARK}) diff --git a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp b/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp index 3f59de1a814c3..de54874c63511 100644 --- a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp +++ b/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp @@ -65,6 +65,21 @@ class SerializeBenchmark { VELOX_CHECK_EQ(serialized.size(), data->size()); } + void serializeCompactRange(const RowTypePtr& rowType) { + folly::BenchmarkSuspender suspender; + auto data = makeData(rowType); + suspender.dismiss(); + + auto numRows = data->size(); + std::vector offsets(numRows); + + CompactRow compact(data); + auto totalSize = computeTotalSize(compact, rowType, numRows, offsets); + auto buffer = AlignedBuffer::allocate(totalSize, pool(), 0); + auto serialized = serialize(compact, data->size(), buffer, offsets); + VELOX_CHECK_EQ(serialized.size(), data->size()); + } + void deserializeCompact(const RowTypePtr& rowType) { folly::BenchmarkSuspender suspender; auto data = makeData(rowType); @@ -184,6 +199,45 @@ class SerializeBenchmark { return serialized; } + size_t computeTotalSize( + CompactRow& compactRow, + const RowTypePtr& rowType, + vector_size_t numRows, + std::vector& offsets) { + size_t totalSize = 0; + if (auto fixedRowSize = CompactRow::fixedRowSize(rowType)) { + totalSize = fixedRowSize.value() * numRows; + for (auto i = 0; i < numRows; ++i) { + offsets[i] = fixedRowSize.value() * i; + } + } else { + for (auto i = 0; i < numRows; ++i) { + offsets[i] = totalSize; + totalSize += compactRow.rowSize(i); + } + } + return totalSize; + } + + std::vector serialize( + CompactRow& compactRow, + vector_size_t numRows, + BufferPtr& buffer, + std::vector& offsets) { + auto rawBuffer = buffer->asMutable(); + IndexRange indexRange{0, numRows}; + compactRow.serialize(indexRange, rawBuffer, offsets); + VELOX_CHECK_EQ(buffer->size(), offsets.back()); + + std::vector serialized; + serialized.push_back(std::string_view(rawBuffer, offsets[0])); + for (auto i = 1; i < numRows; ++i) { + serialized.push_back(std::string_view( + rawBuffer + offsets[i - 1], offsets[i] - offsets[i - 1])); + } + return serialized; + } + HashStringAllocator::Position serialize( const RowVectorPtr& data, HashStringAllocator& allocator) { @@ -205,35 +259,40 @@ class SerializeBenchmark { memory::memoryManager()->addLeafPool()}; }; -#define SERDE_BENCHMARKS(name, rowType) \ - BENCHMARK(unsafe_serialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.serializeUnsafe(rowType); \ - } \ - \ - BENCHMARK(compact_serialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.serializeCompact(rowType); \ - } \ - \ - BENCHMARK(container_serialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.serializeContainer(rowType); \ - } \ - \ - BENCHMARK(unsafe_deserialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.deserializeUnsafe(rowType); \ - } \ - \ - BENCHMARK(compact_deserialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.deserializeCompact(rowType); \ - } \ - \ - BENCHMARK(container_deserialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.deserializeContainer(rowType); \ +#define SERDE_BENCHMARKS(name, rowType) \ + BENCHMARK(unsafe_serialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.serializeUnsafe(rowType); \ + } \ + \ + BENCHMARK(compact_serialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.serializeCompact(rowType); \ + } \ + \ + BENCHMARK(compact_serialize_range_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.serializeCompactRange(rowType); \ + } \ + \ + BENCHMARK(container_serialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.serializeContainer(rowType); \ + } \ + \ + BENCHMARK(unsafe_deserialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.deserializeUnsafe(rowType); \ + } \ + \ + BENCHMARK(compact_deserialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.deserializeCompact(rowType); \ + } \ + \ + BENCHMARK(container_deserialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.deserializeContainer(rowType); \ } SERDE_BENCHMARKS( diff --git a/velox/row/tests/CompactRowTest.cpp b/velox/row/tests/CompactRowTest.cpp index 00ef636a0e309..b7bf53c23147b 100644 --- a/velox/row/tests/CompactRowTest.cpp +++ b/velox/row/tests/CompactRowTest.cpp @@ -49,35 +49,59 @@ class CompactRowTest : public ::testing::Test, public VectorTestBase { auto rowType = asRowType(data->type()); auto numRows = data->size(); + IndexRange range{0, numRows}; + std::vector offsets(numRows); CompactRow row(data); size_t totalSize = 0; if (auto fixedRowSize = CompactRow::fixedRowSize(rowType)) { totalSize = fixedRowSize.value() * numRows; + for (auto i = 0; i < numRows; ++i) { + offsets[i] = fixedRowSize.value() * i; + } } else { for (auto i = 0; i < numRows; ++i) { + offsets[i] = totalSize; totalSize += row.rowSize(i); } } - std::vector serialized; - BufferPtr buffer = AlignedBuffer::allocate(totalSize, pool(), 0); auto* rawBuffer = buffer->asMutable(); - size_t offset = 0; - for (auto i = 0; i < numRows; ++i) { - auto size = row.serialize(i, rawBuffer + offset); - serialized.push_back(std::string_view(rawBuffer + offset, size)); - offset += size; + { + size_t offset = 0; + std::vector serialized; + for (auto i = 0; i < numRows; ++i) { + auto size = row.serialize(i, rawBuffer + offset); + serialized.push_back(std::string_view(rawBuffer + offset, size)); + offset += size; + + VELOX_CHECK_EQ( + size, row.rowSize(i), "Row {}: {}", i, data->toString(i)); + } + + VELOX_CHECK_EQ(offset, totalSize); - VELOX_CHECK_EQ(size, row.rowSize(i), "Row {}: {}", i, data->toString(i)); + auto copy = CompactRow::deserialize(serialized, rowType, pool()); + assertEqualVectors(data, copy); } - VELOX_CHECK_EQ(offset, totalSize); + memset(rawBuffer, 0, totalSize); + { + std::vector serialized; + row.serialize(range, rawBuffer, offsets); + serialized.push_back(std::string_view(rawBuffer, offsets[0])); + for (auto i = 1; i < numRows; ++i) { + serialized.push_back(std::string_view( + rawBuffer + offsets[i - 1], offsets[i] - offsets[i - 1])); + } - auto copy = CompactRow::deserialize(serialized, rowType, pool()); - assertEqualVectors(data, copy); + VELOX_CHECK_EQ(offsets.back(), totalSize); + + auto copy = CompactRow::deserialize(serialized, rowType, pool()); + assertEqualVectors(data, copy); + } } };