From 77013d96edcf379864251fac1a3b2b34f0cec4e5 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Fri, 9 Aug 2024 07:07:05 +0000 Subject: [PATCH] CompactRow support serialize a row range --- velox/docs/develop/serde/compactrow.rst | 3 + velox/row/CMakeLists.txt | 4 + velox/row/CompactRow.cpp | 220 ++++++++++++++++-- velox/row/CompactRow.h | 21 ++ velox/row/benchmarks/CMakeLists.txt | 24 ++ .../DynamicRowVectorDeserializeBenchmark.cpp | 0 .../UnsafeRowSerializeBenchmark.cpp | 121 +++++++--- velox/row/tests/CompactRowTest.cpp | 48 ++-- 8 files changed, 384 insertions(+), 57 deletions(-) create mode 100644 velox/row/benchmarks/CMakeLists.txt rename velox/row/{benchmark => benchmarks}/DynamicRowVectorDeserializeBenchmark.cpp (100%) rename velox/row/{benchmark => benchmarks}/UnsafeRowSerializeBenchmark.cpp (70%) diff --git a/velox/docs/develop/serde/compactrow.rst b/velox/docs/develop/serde/compactrow.rst index bb40c909b87e3..230a2744bc10f 100644 --- a/velox/docs/develop/serde/compactrow.rst +++ b/velox/docs/develop/serde/compactrow.rst @@ -38,6 +38,9 @@ TIMESTAMP 8 UNKNOWN 0 ================ ============================================== +Timestamps are serialized with microsecond precision to align with Spark's +handling of timestamps. + Strings (VARCHAR and VARBINARY) use 4 bytes for size plus the length of the string. Empty string uses 4 bytes. 1-character string uses 5 bytes. 20-character ASCII string uses 24 bytes. Null strings do not take up space diff --git a/velox/row/CMakeLists.txt b/velox/row/CMakeLists.txt index 9eb9cb524682e..91e0266dcf7ae 100644 --- a/velox/row/CMakeLists.txt +++ b/velox/row/CMakeLists.txt @@ -19,3 +19,7 @@ velox_link_libraries(velox_row_fast PUBLIC velox_vector) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() + +if(${VELOX_ENABLE_BENCHMARKS}) + add_subdirectory(benchmarks) +endif() diff --git a/velox/row/CompactRow.cpp b/velox/row/CompactRow.cpp index e692e490df2bf..8dee6fe80b7c7 100644 --- a/velox/row/CompactRow.cpp +++ b/velox/row/CompactRow.cpp @@ -17,6 +17,147 @@ #include "velox/vector/FlatVector.h" namespace facebook::velox::row { +namespace { +constexpr size_t kSizeBytes = sizeof(int32_t); + +void writeInt32(char* buffer, int32_t n) { + memcpy(buffer, &n, kSizeBytes); +} + +int32_t readInt32(const char* buffer) { + int32_t n; + memcpy(&n, buffer, kSizeBytes); + return n; +} + +// Serialize a child vector of a row type within a list of rows. +// Write the serialized data at offsets of buffer row by row. +// Update offsets with the actual serialized size. +template +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto mayHaveNulls = decoded.mayHaveNulls(); + const auto* rawData = decoded.data(); + + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write fixed-width value. + memcpy( + buffer + offsets[i], + rawData + decoded.index(rows[i]) * valueBytes, + valueBytes); + } + offsets[i] += valueBytes; + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& /* unused */, + size_t /* unused */, + const raw_vector& nulls, + char* /* unused */, + std::vector& /* unused */) { + for (auto i = 0; i < rows.size(); ++i) { + bits::setBit(nulls[i], childIdx, true); + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t /* unused */, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto mayHaveNulls = decoded.mayHaveNulls(); + auto* byte = reinterpret_cast(buffer); + + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write 1 byte for bool type. + byte[offsets[i]] = decoded.valueAt(rows[i]); + } + offsets[i] += 1; + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t /* unused */, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto mayHaveNulls = decoded.mayHaveNulls(); + const auto* rawData = decoded.data(); + + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write micros(int64_t) for timestamp value. + auto micros = rawData[rows[i]].toMicros(); + memcpy(buffer + offsets[i], µs, sizeof(int64_t)); + } + offsets[i] += sizeof(int64_t); + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + const auto mayHaveNulls = decoded.mayHaveNulls(); + + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && decoded.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + auto value = decoded.valueAt(rows[i]); + writeInt32(buffer + offsets[i], value.size()); + if (!value.empty()) { + memcpy(buffer + offsets[i] + kSizeBytes, value.data(), value.size()); + } + offsets[i] += kSizeBytes + value.size(); + } + } +} + +template <> +void serializeTyped( + const raw_vector& rows, + uint32_t childIdx, + DecodedVector& decoded, + size_t valueBytes, + const raw_vector& nulls, + char* buffer, + std::vector& offsets) { + serializeTyped( + rows, childIdx, decoded, valueBytes, nulls, buffer, offsets); +} +} // namespace CompactRow::CompactRow(const RowVectorPtr& vector) : typeKind_{vector->typeKind()}, decoded_{*vector} { @@ -184,6 +325,62 @@ int32_t CompactRow::serializeRow(vector_size_t index, char* buffer) { return valuesOffset; } +void CompactRow::serializeRow( + vector_size_t offset, + vector_size_t size, + char* buffer, + const std::vector& bufferOffsets) { + VELOX_CHECK_EQ(bufferOffsets.size(), size); + + raw_vector rows(size); + raw_vector nulls(size); + if (decoded_.isIdentityMapping()) { + std::iota(rows.begin(), rows.end(), offset); + } else { + for (auto i = 0; i < size; ++i) { + rows[i] = decoded_.index(offset + i); + } + } + + // After serializing each column, the 'offsets' are updated accordingly. + std::vector offsets = bufferOffsets; + auto* base = reinterpret_cast(buffer); + for (auto i = 0; i < size; ++i) { + nulls[i] = base + offsets[i]; + offsets[i] += rowNullBytes_; + } + + for (auto childIdx = 0; childIdx < children_.size(); ++childIdx) { + auto& child = children_[childIdx]; + if (childIsFixedWidth_[childIdx] || + child.typeKind_ == TypeKind::VARBINARY || + child.typeKind_ == TypeKind::VARCHAR) { + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + serializeTyped, + child.typeKind_, + rows, + childIdx, + child.decoded_, + child.valueBytes_, + nulls, + buffer, + offsets); + } else { + auto mayHaveNulls = child.decoded_.mayHaveNulls(); + for (auto i = 0; i < rows.size(); ++i) { + if (mayHaveNulls && child.isNullAt(rows[i])) { + bits::setBit(nulls[i], childIdx, true); + } else { + // Write non-null variable-width value. + auto bytes = + child.serializeVariableWidth(rows[i], buffer + offsets[i]); + offsets[i] += bytes; + } + } + } + } +} + bool CompactRow::isNullAt(vector_size_t index) { return decoded_.isNullAt(index); } @@ -281,21 +478,6 @@ int32_t CompactRow::serializeArray(vector_size_t index, char* buffer) { children_[0], offset, size, childIsFixedWidth_[0], buffer); } -namespace { - -constexpr size_t kSizeBytes = sizeof(int32_t); - -void writeInt32(char* buffer, int32_t n) { - memcpy(buffer, &n, sizeof(int32_t)); -} - -int32_t readInt32(const char* buffer) { - int32_t n; - memcpy(&n, buffer, sizeof(int32_t)); - return n; -} -} // namespace - int32_t CompactRow::serializeAsArray( CompactRow& elements, vector_size_t offset, @@ -420,6 +602,14 @@ int32_t CompactRow::serialize(vector_size_t index, char* buffer) { return serializeRow(index, buffer); } +void CompactRow::serialize( + vector_size_t offset, + vector_size_t size, + char* buffer, + const std::vector& bufferOffsets) { + return serializeRow(offset, size, buffer, bufferOffsets); +} + void CompactRow::serializeFixedWidth(vector_size_t index, char* buffer) { VELOX_DCHECK(fixedWidthTypeKind_); switch (typeKind_) { diff --git a/velox/row/CompactRow.h b/velox/row/CompactRow.h index 9abaed0bdee23..0b9db0496a846 100644 --- a/velox/row/CompactRow.h +++ b/velox/row/CompactRow.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/base/RawVector.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DecodedVector.h" @@ -36,6 +37,18 @@ class CompactRow { /// 'buffer' must have sufficient capacity and set to all zeros. int32_t serialize(vector_size_t index, char* buffer); + /// Serializes rows in range [offset, offset + size) into 'buffer' at given + /// 'bufferOffsets'. 'buffer' must have sufficient capacity and set to all + /// zeros. 'bufferOffsets' must be pre-filled with the write offsets for each + /// row and must have the same number of elements as the 'size' parameter. + /// The caller must ensure that the space between each offset in + /// 'bufferOffsets' is no less than the 'fixedRowSize' or 'rowSize'. + void serialize( + vector_size_t offset, + vector_size_t size, + char* buffer, + const std::vector& bufferOffsets); + /// Deserializes multiple rows into a RowVector of specified type. The type /// must match the contents of the serialized rows. static RowVectorPtr deserialize( @@ -108,6 +121,14 @@ class CompactRow { /// Serializes struct value to buffer. Value must not be null. int32_t serializeRow(vector_size_t index, char* buffer); + /// Serializes struct values in range [offset, offset + size) to buffer. + /// Value must not be null. + void serializeRow( + vector_size_t offset, + vector_size_t size, + char* buffer, + const std::vector& bufferOffsets); + const TypeKind typeKind_; DecodedVector decoded_; diff --git a/velox/row/benchmarks/CMakeLists.txt b/velox/row/benchmarks/CMakeLists.txt new file mode 100644 index 0000000000000..34e8631007225 --- /dev/null +++ b/velox/row/benchmarks/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_unsafe_row_serialize_benchmark + UnsafeRowSerializeBenchmark.cpp) +target_link_libraries( + velox_unsafe_row_serialize_benchmark + PRIVATE + velox_exec + velox_row_fast + velox_vector_fuzzer + Folly::folly + ${FOLLY_BENCHMARK}) diff --git a/velox/row/benchmark/DynamicRowVectorDeserializeBenchmark.cpp b/velox/row/benchmarks/DynamicRowVectorDeserializeBenchmark.cpp similarity index 100% rename from velox/row/benchmark/DynamicRowVectorDeserializeBenchmark.cpp rename to velox/row/benchmarks/DynamicRowVectorDeserializeBenchmark.cpp diff --git a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp b/velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp similarity index 70% rename from velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp rename to velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp index 388908ecd7187..05ef7fca898fa 100644 --- a/velox/row/benchmark/UnsafeRowSerializeBenchmark.cpp +++ b/velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp @@ -65,6 +65,24 @@ class SerializeBenchmark { VELOX_CHECK_EQ(serialized.size(), data->size()); } + void serializeCompactRange(const RowTypePtr& rowType) { + folly::BenchmarkSuspender suspender; + auto data = makeData(rowType); + suspender.dismiss(); + + auto numRows = data->size(); + std::vector rowSize(numRows); + std::vector offsets(numRows); + + CompactRow compact(data); + auto totalSize = + computeTotalSize(compact, rowType, numRows, rowSize, offsets); + auto buffer = AlignedBuffer::allocate(totalSize, pool(), 0); + auto serialized = + serialize(compact, data->size(), buffer, rowSize, offsets); + VELOX_CHECK_EQ(serialized.size(), data->size()); + } + void deserializeCompact(const RowTypePtr& rowType) { folly::BenchmarkSuspender suspender; auto data = makeData(rowType); @@ -184,6 +202,46 @@ class SerializeBenchmark { return serialized; } + size_t computeTotalSize( + CompactRow& compactRow, + const RowTypePtr& rowType, + vector_size_t numRows, + std::vector& rowSize, + std::vector& offsets) { + size_t totalSize = 0; + if (auto fixedRowSize = CompactRow::fixedRowSize(rowType)) { + totalSize = fixedRowSize.value() * numRows; + for (auto i = 0; i < numRows; ++i) { + rowSize[i] = fixedRowSize.value(); + offsets[i] = fixedRowSize.value() * i; + } + } else { + for (auto i = 0; i < numRows; ++i) { + rowSize[i] = compactRow.rowSize(i); + offsets[i] = totalSize; + totalSize += rowSize[i]; + } + } + return totalSize; + } + + std::vector serialize( + CompactRow& compactRow, + vector_size_t numRows, + BufferPtr& buffer, + std::vector& rowSize, + std::vector& offsets) { + auto rawBuffer = buffer->asMutable(); + compactRow.serialize(0, numRows, rawBuffer, offsets); + + std::vector serialized; + for (auto i = 0; i < numRows; ++i) { + serialized.push_back( + std::string_view(rawBuffer + offsets[i], rowSize[i])); + } + return serialized; + } + HashStringAllocator::Position serialize( const RowVectorPtr& data, HashStringAllocator& allocator) { @@ -205,35 +263,40 @@ class SerializeBenchmark { memory::memoryManager()->addLeafPool()}; }; -#define SERDE_BENCHMARKS(name, rowType) \ - BENCHMARK(unsafe_serialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.serializeUnsafe(rowType); \ - } \ - \ - BENCHMARK(compact_serialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.serializeCompact(rowType); \ - } \ - \ - BENCHMARK(container_serialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.serializeContainer(rowType); \ - } \ - \ - BENCHMARK(unsafe_deserialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.deserializeUnsafe(rowType); \ - } \ - \ - BENCHMARK(compact_deserialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.deserializeCompact(rowType); \ - } \ - \ - BENCHMARK(container_deserialize_##name) { \ - SerializeBenchmark benchmark; \ - benchmark.deserializeContainer(rowType); \ +#define SERDE_BENCHMARKS(name, rowType) \ + BENCHMARK(unsafe_serialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.serializeUnsafe(rowType); \ + } \ + \ + BENCHMARK(compact_serialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.serializeCompact(rowType); \ + } \ + \ + BENCHMARK(compact_serialize_range_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.serializeCompactRange(rowType); \ + } \ + \ + BENCHMARK(container_serialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.serializeContainer(rowType); \ + } \ + \ + BENCHMARK(unsafe_deserialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.deserializeUnsafe(rowType); \ + } \ + \ + BENCHMARK(compact_deserialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.deserializeCompact(rowType); \ + } \ + \ + BENCHMARK(container_deserialize_##name) { \ + SerializeBenchmark benchmark; \ + benchmark.deserializeContainer(rowType); \ } SERDE_BENCHMARKS( diff --git a/velox/row/tests/CompactRowTest.cpp b/velox/row/tests/CompactRowTest.cpp index 00ef636a0e309..6940aa10e0553 100644 --- a/velox/row/tests/CompactRowTest.cpp +++ b/velox/row/tests/CompactRowTest.cpp @@ -49,35 +49,57 @@ class CompactRowTest : public ::testing::Test, public VectorTestBase { auto rowType = asRowType(data->type()); auto numRows = data->size(); + std::vector rowSize(numRows); + std::vector offsets(numRows); CompactRow row(data); size_t totalSize = 0; if (auto fixedRowSize = CompactRow::fixedRowSize(rowType)) { totalSize = fixedRowSize.value() * numRows; + for (auto i = 0; i < numRows; ++i) { + rowSize[i] = fixedRowSize.value(); + offsets[i] = fixedRowSize.value() * i; + } } else { for (auto i = 0; i < numRows; ++i) { - totalSize += row.rowSize(i); + rowSize[i] = row.rowSize(i); + offsets[i] = totalSize; + totalSize += rowSize[i]; } } - std::vector serialized; - BufferPtr buffer = AlignedBuffer::allocate(totalSize, pool(), 0); auto* rawBuffer = buffer->asMutable(); - size_t offset = 0; - for (auto i = 0; i < numRows; ++i) { - auto size = row.serialize(i, rawBuffer + offset); - serialized.push_back(std::string_view(rawBuffer + offset, size)); - offset += size; + { + size_t offset = 0; + std::vector serialized; + for (auto i = 0; i < numRows; ++i) { + auto size = row.serialize(i, rawBuffer + offset); + serialized.push_back(std::string_view(rawBuffer + offset, size)); + offset += size; - VELOX_CHECK_EQ(size, row.rowSize(i), "Row {}: {}", i, data->toString(i)); - } + VELOX_CHECK_EQ( + size, row.rowSize(i), "Row {}: {}", i, data->toString(i)); + } - VELOX_CHECK_EQ(offset, totalSize); + VELOX_CHECK_EQ(offset, totalSize); - auto copy = CompactRow::deserialize(serialized, rowType, pool()); - assertEqualVectors(data, copy); + auto copy = CompactRow::deserialize(serialized, rowType, pool()); + assertEqualVectors(data, copy); + } + + memset(rawBuffer, 0, totalSize); + { + std::vector serialized; + row.serialize(0, numRows, rawBuffer, offsets); + for (auto i = 0; i < numRows; ++i) { + serialized.push_back( + std::string_view(rawBuffer + offsets[i], rowSize[i])); + } + auto copy = CompactRow::deserialize(serialized, rowType, pool()); + assertEqualVectors(data, copy); + } } };