From 7e0a5a21eb0e1e87cf5284255e54aca2ae38facf Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Mon, 26 Feb 2024 17:40:57 -0800 Subject: [PATCH] PrestoBatchSerializer should not preserve Dictionary encoding if it makes the data larger (#8688) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/8688 This change adds some basic heuristics which serializeDictionaryVector can use to flatten a Vector as part of serializing it rather than preserving the Dictionary encoding. The checks are: * if the size of the Vector type is smaller than or equal to int32_t (the indices into the dictionary) * if the Vector type is fixed width and we determine that the size of the indices + the size of the alphabet is larger than the size of the original data * regardless of the Vector type, if the alphabet contains unique values This helps to ensure the preserving encodings during serialization won't actually make the serialized data larger. Reviewed By: bikramSingh91 Differential Revision: D53484809 fbshipit-source-id: c7954b827a0a8e946a67d53e5b1195184c9e8d3a --- velox/serializers/PrestoSerializer.cpp | 295 +++++++++++------- .../tests/PrestoSerializerTest.cpp | 179 ++++++++--- velox/vector/BaseVector.cpp | 16 + velox/vector/BaseVector.h | 2 + 4 files changed, 324 insertions(+), 168 deletions(-) diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index d505ea137169..45cf45d2d73b 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1314,8 +1314,8 @@ class VectorStream { const SerdeOpts& opts) : type_(type), encoding_(getEncoding(encoding, vector)), - useLosslessTimestamp_(opts.useLosslessTimestamp), - nullsFirst_(opts.nullsFirst), + opts_(opts), + streamArena_(streamArena), nulls_(streamArena, true, true), lengths_(streamArena), values_(streamArena), @@ -1340,6 +1340,15 @@ class VectorStream { return; } case VectorEncoding::Simple::DICTIONARY: { + // For fix width types that are smaller than int32_t (the type for + // indexes into the dictionary) dictionary encoding increases the + // size, so we should flatten it. + if (type->isFixedWidth() && + type->cppSizeInBytes() <= sizeof(int32_t)) { + encoding_ = std::nullopt; + break; + } + initializeHeader(kDictionary, *streamArena); values_.startWrite(initialNumRows * 4); isDictionaryStream_ = true; @@ -1357,42 +1366,24 @@ class VectorStream { } } - initializeHeader(typeToEncodingName(type), *streamArena); - nulls_.startWrite(1 + (initialNumRows / 8)); + initializeFlatStream(vector, initialNumRows); + } - switch (type_->kind()) { - case TypeKind::ROW: - [[fallthrough]]; - case TypeKind::ARRAY: - [[fallthrough]]; - case TypeKind::MAP: - hasLengths_ = true; - lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); - children_.resize(type_->size()); - for (int32_t i = 0; i < type_->size(); ++i) { - children_[i] = std::make_unique( - type_->childAt(i), - std::nullopt, - getChildAt(vector, i), - streamArena, - initialNumRows, - opts); - } - // The first element in the offsets in the wire format is always 0 for - // nested types. - lengths_.appendOne(0); - break; - case TypeKind::VARCHAR: - [[fallthrough]]; - case TypeKind::VARBINARY: - hasLengths_ = true; - lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); - values_.startWrite(initialNumRows * 10); - break; - default: - values_.startWrite(initialNumRows * 4); - break; + void flattenStream(const VectorPtr& vector, int32_t initialNumRows) { + VELOX_CHECK_EQ(nullCount_, 0); + VELOX_CHECK_EQ(nonNullCount_, 0); + VELOX_CHECK_EQ(totalLength_, 0); + + if (!isConstantStream_ && !isDictionaryStream_) { + return; } + + encoding_ = std::nullopt; + isConstantStream_ = false; + isDictionaryStream_ = false; + children_.clear(); + + initializeFlatStream(vector, initialNumRows); } std::optional getEncoding( @@ -1585,7 +1576,7 @@ class VectorStream { switch (type_->kind()) { case TypeKind::ROW: - if (nullsFirst_) { + if (opts_.nullsFirst) { writeInt32(out, nullCount_ + nonNullCount_); flushNulls(out); } @@ -1594,7 +1585,7 @@ class VectorStream { for (auto& child : children_) { child->flush(out); } - if (!nullsFirst_) { + if (!opts_.nullsFirst) { writeInt32(out, nullCount_ + nonNullCount_); lengths_.flush(out); flushNulls(out); @@ -1652,13 +1643,56 @@ class VectorStream { } private: + void initializeFlatStream( + std::optional vector, + vector_size_t initialNumRows) { + initializeHeader(typeToEncodingName(type_), *streamArena_); + nulls_.startWrite(1 + (initialNumRows / 8)); + + switch (type_->kind()) { + case TypeKind::ROW: + [[fallthrough]]; + case TypeKind::ARRAY: + [[fallthrough]]; + case TypeKind::MAP: + hasLengths_ = true; + lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); + children_.resize(type_->size()); + for (int32_t i = 0; i < type_->size(); ++i) { + children_[i] = std::make_unique( + type_->childAt(i), + std::nullopt, + getChildAt(vector, i), + streamArena_, + initialNumRows, + opts_); + } + // The first element in the offsets in the wire format is always 0 for + // nested types. + lengths_.appendOne(0); + break; + case TypeKind::VARCHAR: + [[fallthrough]]; + case TypeKind::VARBINARY: + hasLengths_ = true; + lengths_.startWrite(initialNumRows * sizeof(vector_size_t)); + if (values_.ranges().empty()) { + values_.startWrite(initialNumRows * 10); + } + break; + default: + if (values_.ranges().empty()) { + values_.startWrite(initialNumRows * 4); + } + break; + } + } + const TypePtr type_; - const std::optional encoding_; - /// Indicates whether to serialize timestamps with nanosecond precision. - /// If false, they are serialized with millisecond precision which is - /// compatible with presto. - const bool useLosslessTimestamp_; - const bool nullsFirst_; + std::optional encoding_; + const SerdeOpts opts_; + + StreamArena* streamArena_; int32_t nonNullCount_{0}; int32_t nullCount_{0}; int32_t totalLength_{0}; @@ -1684,7 +1718,7 @@ inline void VectorStream::append(folly::Range values) { template <> void VectorStream::append(folly::Range values) { - if (useLosslessTimestamp_) { + if (opts_.useLosslessTimestamp) { for (auto& value : values) { appendOne(value.getSeconds()); appendOne(value.getNanos()); @@ -1728,11 +1762,11 @@ void VectorStream::append(folly::Range values) { template void serializeFlatVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream) { using T = typename TypeTraits::NativeType; - auto* flatVector = dynamic_cast*>(vector); + auto* flatVector = vector->as>(); auto* rawValues = flatVector->rawValues(); if (!flatVector->mayHaveNulls()) { for (auto& range : ranges) { @@ -1777,10 +1811,10 @@ void serializeFlatVector( template <> void serializeFlatVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream) { - auto flatVector = dynamic_cast*>(vector); + auto flatVector = vector->as>(); if (!vector->mayHaveNulls()) { for (int32_t i = 0; i < ranges.size(); ++i) { stream->appendNonNull(ranges[i].size); @@ -1805,25 +1839,25 @@ void serializeFlatVector( } void serializeColumn( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch); void serializeColumn( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch); void serializeWrapped( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { std::vector newRanges; const bool mayHaveNulls = vector->mayHaveNulls(); - const BaseVector* wrapped = vector->wrappedVector(); + const VectorPtr& wrapped = BaseVector::wrappedVectorShared(vector); for (int32_t i = 0; i < ranges.size(); ++i) { const auto end = ranges[i].begin + ranges[i].size; for (int32_t offset = ranges[i].begin; offset < end; ++offset) { @@ -1846,11 +1880,11 @@ void serializeWrapped( } void serializeRowVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { - auto rowVector = dynamic_cast(vector); + auto rowVector = vector->as(); std::vector childRanges; for (int32_t i = 0; i < ranges.size(); ++i) { @@ -1868,16 +1902,16 @@ void serializeRowVector( } for (int32_t i = 0; i < rowVector->childrenSize(); ++i) { serializeColumn( - rowVector->childAt(i).get(), childRanges, stream->childAt(i), scratch); + rowVector->childAt(i), childRanges, stream->childAt(i), scratch); } } void serializeArrayVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { - auto arrayVector = dynamic_cast(vector); + auto arrayVector = vector->as(); auto rawSizes = arrayVector->rawSizes(); auto rawOffsets = arrayVector->rawOffsets(); std::vector childRanges; @@ -1899,15 +1933,15 @@ void serializeArrayVector( } } serializeColumn( - arrayVector->elements().get(), childRanges, stream->childAt(0), scratch); + arrayVector->elements(), childRanges, stream->childAt(0), scratch); } void serializeMapVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { - auto mapVector = dynamic_cast(vector); + auto mapVector = vector->as(); auto rawSizes = mapVector->rawSizes(); auto rawOffsets = mapVector->rawOffsets(); std::vector childRanges; @@ -1929,9 +1963,9 @@ void serializeMapVector( } } serializeColumn( - mapVector->mapKeys().get(), childRanges, stream->childAt(0), scratch); + mapVector->mapKeys(), childRanges, stream->childAt(0), scratch); serializeColumn( - mapVector->mapValues().get(), childRanges, stream->childAt(1), scratch); + mapVector->mapValues(), childRanges, stream->childAt(1), scratch); } static inline int32_t rangesTotalSize( @@ -1945,14 +1979,20 @@ static inline int32_t rangesTotalSize( template void serializeDictionaryVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { - // Cannot serialize dictionary as PrestoPage dictionary if it has nulls. - // Also check if the stream was set up for dictionary (we had to know the + // Check if the stream was set up for dictionary (we had to know the // encoding type when creating VectorStream for that). - if (vector->nulls() || !stream->isDictionaryStream()) { + if (!stream->isDictionaryStream()) { + serializeWrapped(vector, ranges, stream, scratch); + return; + } + + // Cannot serialize dictionary as PrestoPage dictionary if it has nulls. + if (vector->nulls()) { + stream->flattenStream(vector, rangesTotalSize(ranges)); serializeWrapped(vector, ranges, stream, scratch); return; } @@ -1962,10 +2002,9 @@ void serializeDictionaryVector( // Create a bit set to track which values in the Dictionary are used. ScratchPtr usedIndicesHolder(scratch); - auto usedIndicesCapacity = - bits::nwords(dictionaryVector->valueVector()->size()); - auto* usedIndices = usedIndicesHolder.get(usedIndicesCapacity); - simd::memset(usedIndices, 0, usedIndicesCapacity * sizeof(uint64_t)); + auto* usedIndices = usedIndicesHolder.get( + bits::nwords(dictionaryVector->valueVector()->size())); + simd::memset(usedIndices, 0, usedIndicesHolder.size() * sizeof(uint64_t)); auto* indices = dictionaryVector->indices()->template as(); vector_size_t numRows = 0; @@ -1981,14 +2020,36 @@ void serializeDictionaryVector( auto* mutableSelectedIndices = selectedIndicesHolder.get(dictionaryVector->valueVector()->size()); auto numUsed = simd::indicesOfSetBits( - usedIndices, - 0, - dictionaryVector->valueVector()->size(), - mutableSelectedIndices); + usedIndices, 0, selectedIndicesHolder.size(), mutableSelectedIndices); + + // If the values are fixed width and we aren't getting enough reuse to justify + // the dictionary, flatten it. + // For variable width types, rather than iterate over them computing their + // size, we simply assume we'll get a benefit. + if constexpr (TypeTraits::isFixedWidth) { + // This calculation admittdely ignores some constants, but if they really + // make a difference, they're small so there's not much difference either + // way. + if (numUsed * vector->type()->cppSizeInBytes() + + numRows * sizeof(int32_t) >= + numRows * vector->type()->cppSizeInBytes()) { + stream->flattenStream(vector, numRows); + serializeWrapped(vector, ranges, stream, scratch); + return; + } + } + + // If every element is unique the dictionary isn't giving us any benefit, + // flatten it. + if (numUsed == numRows) { + stream->flattenStream(vector, numRows); + serializeWrapped(vector, ranges, stream, scratch); + return; + } // Serialize the used elements from the Dictionary. serializeColumn( - dictionaryVector->valueVector().get(), + dictionaryVector->valueVector(), folly::Range(mutableSelectedIndices, numUsed), stream->childAt(0), scratch); @@ -2014,14 +2075,14 @@ void serializeDictionaryVector( template void serializeConstantVectorImpl( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { using T = typename KindToFlatVector::WrapperType; - auto constVector = dynamic_cast*>(vector); + auto constVector = vector->as>(); if (constVector->valueVector() != nullptr) { - serializeWrapped(constVector, ranges, stream, scratch); + serializeWrapped(vector, ranges, stream, scratch); return; } @@ -2042,7 +2103,7 @@ void serializeConstantVectorImpl( template void serializeConstantVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { @@ -2062,10 +2123,10 @@ void serializeConstantVector( template void serializeBiasVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream) { - auto biasVector = dynamic_cast*>(vector); + auto biasVector = vector->as>(); if (!vector->mayHaveNulls()) { for (int32_t i = 0; i < ranges.size(); ++i) { stream->appendNonNull(ranges[i].size); @@ -2090,7 +2151,7 @@ void serializeBiasVector( } void serializeColumn( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { @@ -2144,7 +2205,8 @@ void serializeColumn( serializeMapVector(vector, ranges, stream, scratch); break; case VectorEncoding::Simple::LAZY: - serializeColumn(vector->loadedVector(), ranges, stream, scratch); + serializeColumn( + BaseVector::loadedVectorShared(vector), ranges, stream, scratch); break; default: serializeWrapped(vector, ranges, stream, scratch); @@ -2376,7 +2438,7 @@ void appendTimestamps( template void serializeFlatVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { @@ -2444,11 +2506,11 @@ uint64_t bitsToBytes(uint8_t byte) { template <> void serializeFlatVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { - auto* flatVector = reinterpret_cast*>(vector); + auto* flatVector = vector->as>(); auto* rawValues = flatVector->rawValues(); ScratchPtr bitsHolder(scratch); uint64_t* valueBits; @@ -2495,7 +2557,7 @@ void serializeFlatVector( } void serializeWrapped( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { @@ -2503,18 +2565,20 @@ void serializeWrapped( const int32_t numRows = rows.size(); int32_t numInner = 0; auto* innerRows = innerRowsHolder.get(numRows); - const BaseVector* wrapped; + bool mayHaveNulls = vector->mayHaveNulls(); + VectorPtr wrapped; if (vector->encoding() == VectorEncoding::Simple::DICTIONARY && - !vector->rawNulls()) { + !mayHaveNulls) { // Dictionary with no nulls. auto* indices = vector->wrapInfo()->as(); - wrapped = vector->valueVector().get(); + wrapped = vector->valueVector(); simd::transpose(indices, rows, innerRows); numInner = numRows; } else { - wrapped = vector->wrappedVector(); + wrapped = BaseVector::wrappedVectorShared(vector); for (int32_t i = 0; i < rows.size(); ++i) { - if (vector->isNullAt(rows[i])) { + if (mayHaveNulls && vector->isNullAt(rows[i])) { + // The wrapper added a null. if (numInner > 0) { serializeColumn( wrapped, @@ -2529,6 +2593,7 @@ void serializeWrapped( innerRows[numInner++] = vector->wrappedIndex(rows[i]); } } + if (numInner > 0) { serializeColumn( wrapped, @@ -2540,7 +2605,7 @@ void serializeWrapped( template <> void serializeFlatVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { @@ -2553,7 +2618,7 @@ void serializeFlatVector( template <> void serializeFlatVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& ranges, VectorStream* stream, Scratch& scratch) { @@ -2561,11 +2626,11 @@ void serializeFlatVector( } void serializeRowVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { - auto rowVector = reinterpret_cast(vector); + auto rowVector = vector->as(); vector_size_t* childRows; int32_t numChildRows = 0; ScratchPtr nullsHolder(scratch); @@ -2590,7 +2655,7 @@ void serializeRowVector( } for (int32_t i = 0; i < rowVector->childrenSize(); ++i) { serializeColumn( - rowVector->childAt(i).get(), + rowVector->childAt(i), folly::Range(innerRows, numInnerRows), stream->childAt(i), scratch); @@ -2598,11 +2663,11 @@ void serializeRowVector( } void serializeArrayVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { - auto arrayVector = reinterpret_cast(vector); + auto arrayVector = vector->as(); ScratchPtr rangesHolder(scratch); int32_t numRanges = rowsToRanges( @@ -2619,18 +2684,18 @@ void serializeArrayVector( return; } serializeColumn( - arrayVector->elements().get(), + arrayVector->elements(), folly::Range(rangesHolder.get(), numRanges), stream->childAt(0), scratch); } void serializeMapVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { - auto mapVector = reinterpret_cast(vector); + auto mapVector = vector->as(); ScratchPtr rangesHolder(scratch); int32_t numRanges = rowsToRanges( @@ -2647,12 +2712,12 @@ void serializeMapVector( return; } serializeColumn( - mapVector->mapKeys().get(), + mapVector->mapKeys(), folly::Range(rangesHolder.get(), numRanges), stream->childAt(0), scratch); serializeColumn( - mapVector->mapValues().get(), + mapVector->mapValues(), folly::Range(rangesHolder.get(), numRanges), stream->childAt(1), scratch); @@ -2660,14 +2725,14 @@ void serializeMapVector( template void serializeConstantVector( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { using T = typename KindToFlatVector::WrapperType; - auto constVector = dynamic_cast*>(vector); + auto constVector = vector->as>(); if (constVector->valueVector()) { - serializeWrapped(constVector, rows, stream, scratch); + serializeWrapped(vector, rows, stream, scratch); return; } const auto numRows = rows.size(); @@ -2695,7 +2760,7 @@ void serializeBiasVector( } void serializeColumn( - const BaseVector* vector, + const VectorPtr& vector, const folly::Range& rows, VectorStream* stream, Scratch& scratch) { @@ -2730,7 +2795,8 @@ void serializeColumn( serializeMapVector(vector, rows, stream, scratch); break; case VectorEncoding::Simple::LAZY: - serializeColumn(vector->loadedVector(), rows, stream, scratch); + serializeColumn( + BaseVector::loadedVectorShared(vector), rows, stream, scratch); break; default: serializeWrapped(vector, rows, stream, scratch); @@ -3451,8 +3517,7 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { numRows, opts_); - serializeColumn( - vector->childAt(i).get(), ranges, streams[i].get(), scratch); + serializeColumn(vector->childAt(i), ranges, streams[i].get(), scratch); } flushStreams(streams, numRows, arena, *codec_, stream); @@ -3493,8 +3558,7 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } numRows_ += numNewRows; for (int32_t i = 0; i < vector->childrenSize(); ++i) { - serializeColumn( - vector->childAt(i).get(), ranges, streams_[i].get(), scratch); + serializeColumn(vector->childAt(i), ranges, streams_[i].get(), scratch); } } @@ -3508,8 +3572,7 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { } numRows_ += numNewRows; for (int32_t i = 0; i < vector->childrenSize(); ++i) { - serializeColumn( - vector->childAt(i).get(), rows, streams_[i].get(), scratch); + serializeColumn(vector->childAt(i), rows, streams_[i].get(), scratch); } } diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 3e3338a155d9..48af91381e8d 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -298,15 +298,27 @@ class PrestoSerializerTest void assertEqualEncoding( const RowVectorPtr& expected, - const RowVectorPtr& actual) { + const RowVectorPtr& actual, + // If true, we allow the encodings of actual and expected to differ if a + // dictionary was flattened. + bool allowFlatteningDictionaries = false) { for (auto i = 0; i < expected->childrenSize(); ++i) { - VELOX_CHECK_EQ( - actual->childAt(i)->encoding(), expected->childAt(i)->encoding()); + auto expectedEncoding = expected->childAt(i)->encoding(); + auto actualEncoding = actual->childAt(i)->encoding(); - if (expected->childAt(i)->encoding() == VectorEncoding::Simple::ROW) { + if (allowFlatteningDictionaries && + actualEncoding == VectorEncoding::Simple::FLAT && + expectedEncoding == VectorEncoding::Simple::DICTIONARY) { + continue; + } + + VELOX_CHECK_EQ(actualEncoding, expectedEncoding); + + if (expectedEncoding == VectorEncoding::Simple::ROW) { assertEqualEncoding( std::dynamic_pointer_cast(expected->childAt(i)), - std::dynamic_pointer_cast(actual->childAt(i))); + std::dynamic_pointer_cast(actual->childAt(i)), + allowFlatteningDictionaries); } } } @@ -314,13 +326,15 @@ class PrestoSerializerTest void verifySerializedEncodedData( const RowVectorPtr& original, const std::string& serialized, + bool allowFlatteningDictionaries, const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions) { auto rowType = asRowType(original->type()); auto deserialized = deserialize(rowType, serialized, serdeOptions); assertEqualVectors(original, deserialized); - assertEqualEncoding(original, deserialized); + // Dictionaries may get flattened depending on the nature of the data. + assertEqualEncoding(original, deserialized, allowFlatteningDictionaries); // Deserialize 3 times while appending to a single vector. auto paramOptions = getParamSerdeOptions(serdeOptions); @@ -402,17 +416,6 @@ class PrestoSerializerTest assertEqualVectors(concatenation, deserialized); } - void testEncodedRoundTrip( - const RowVectorPtr& data, - const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = - nullptr) { - std::ostringstream out; - serializeBatch(data, &out, serdeOptions); - const auto serialized = out.str(); - - verifySerializedEncodedData(data, serialized, serdeOptions); - } - void serializeBatch( const RowVectorPtr& rowVector, std::ostream* output, @@ -428,19 +431,22 @@ class PrestoSerializerTest void testBatchVectorSerializerRoundTrip( const RowVectorPtr& data, + bool allowFlatteningDictionaries = false, const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = nullptr) { std::ostringstream out; serializeBatch(data, &out, serdeOptions); const auto serialized = out.str(); - verifySerializedEncodedData(data, serialized, serdeOptions); + verifySerializedEncodedData( + data, serialized, allowFlatteningDictionaries, serdeOptions); } RowVectorPtr encodingsTestVector() { - auto baseNoNulls = makeFlatVector({1, 2, 3, 4}); + // String is variable length, so this ensures the data isn't flattened. + auto baseNoNulls = makeFlatVector({"a", "b", "c", "d"}); auto baseWithNulls = - makeNullableFlatVector({1, std::nullopt, 2, 3}); + makeNullableFlatVector({"a", std::nullopt, "b", "c"}); auto baseArray = makeArrayVector({{1, 2, 3}, {}, {4, 5}, {6, 7, 8, 9, 10}}); auto indices = makeIndices(8, [](auto row) { return row / 2; }); @@ -593,23 +599,27 @@ class PrestoSerializerTest void testMinimalDictionaryEncoding( vector_size_t numRows, vector_size_t alphabetSize) { - auto base = makeFlatVector( - alphabetSize, [](vector_size_t row) { return row; }); - auto allIndices = makeIndices( - numRows, [alphabetSize](auto row) { return row % alphabetSize; }); - auto evenIndices = makeIndices( - numRows, [alphabetSize](auto row) { return (row * 2) % alphabetSize; }); - auto oddIndices = makeIndices(numRows, [alphabetSize](auto row) { - return (row * 2 + 1) % alphabetSize; + // This factor is used to ensure we have some repetition in the dictionary + // in each of the cases to ensure the serializer doesn't flatten the data. + auto factor = alphabetSize <= numRows ? 2 : alphabetSize / numRows * 2; + // String is variable length, so this ensures the data isn't flattened. + auto base = makeFlatVector( + alphabetSize, [](vector_size_t row) { return fmt::format("{}", row); }); + auto evenIndices = makeIndices(numRows, [alphabetSize, factor](auto row) { + return (row * factor) % alphabetSize; + }); + auto oddIndices = makeIndices(numRows, [alphabetSize, factor](auto row) { + return (row * factor + 1) % alphabetSize; + }); + auto prefixIndices = makeIndices(numRows, [alphabetSize, factor](auto row) { + return row % (alphabetSize / factor); }); - auto prefixIndices = makeIndices( - numRows, [alphabetSize](auto row) { return row % (alphabetSize / 2); }); - auto suffixIndices = makeIndices(numRows, [alphabetSize](auto row) { - return row % (alphabetSize / 2) + (alphabetSize / 2); + auto suffixIndices = makeIndices(numRows, [alphabetSize, factor](auto row) { + return row % (alphabetSize / factor) + + (factor - 1) * (alphabetSize / factor); }); auto rows = makeRowVector({ - BaseVector::wrapInDictionary(nullptr, allIndices, numRows, base), BaseVector::wrapInDictionary(nullptr, evenIndices, numRows, base), BaseVector::wrapInDictionary(nullptr, oddIndices, numRows, base), BaseVector::wrapInDictionary(nullptr, prefixIndices, numRows, base), @@ -627,40 +637,30 @@ class PrestoSerializerTest assertEqualVectors(rows, deserialized); assertEqualEncoding(rows, deserialized); - // If the dictionary is larger than numRows, we'll still get distinct - // indices after applying modulus. - auto expectedSmallerAlphabetSize = std::min(alphabetSize / 2, numRows); - ASSERT_EQ( deserialized->childAt(0) - ->as>() + ->as>() ->valueVector() ->size(), - std::min(alphabetSize, numRows)); + alphabetSize / factor); ASSERT_EQ( deserialized->childAt(1) - ->as>() + ->as>() ->valueVector() ->size(), - expectedSmallerAlphabetSize); + alphabetSize / factor); ASSERT_EQ( deserialized->childAt(2) - ->as>() + ->as>() ->valueVector() ->size(), - expectedSmallerAlphabetSize); + alphabetSize / factor); ASSERT_EQ( deserialized->childAt(3) - ->as>() - ->valueVector() - ->size(), - expectedSmallerAlphabetSize); - ASSERT_EQ( - deserialized->childAt(4) - ->as>() + ->as>() ->valueVector() ->size(), - expectedSmallerAlphabetSize); + alphabetSize / factor); } std::unique_ptr serde_; @@ -926,6 +926,81 @@ TEST_P(PrestoSerializerTest, minimalDictionaryEncodings) { testMinimalDictionaryEncoding(32, 64); } +// Test that dictionary encoded inputs are flattened in cases where it doesn't +// help. +TEST_P(PrestoSerializerTest, dictionaryEncodingTurnedOff) { + auto smallIntBase = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto intBase = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto bigintBase = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto stringBase = makeFlatVector( + 32, [](vector_size_t row) { return fmt::format("{}", row); }); + auto oneIndex = makeIndices(32, [](auto) { return 0; }); + auto quarterIndices = makeIndices(32, [](auto row) { return row % 8; }); + auto allButOneIndices = makeIndices(32, [](auto row) { return row % 31; }); + auto allIndices = makeIndices(32, [](auto row) { return row; }); + + auto rows = makeRowVector({ + // Even though these have very effective dictionary encoding, they should + // be flattened because their types are too small. + BaseVector::wrapInDictionary(nullptr, oneIndex, 32, smallIntBase), + BaseVector::wrapInDictionary(nullptr, oneIndex, 32, intBase), + // These should keep dictionary encoding. + BaseVector::wrapInDictionary(nullptr, oneIndex, 32, bigintBase), + BaseVector::wrapInDictionary(nullptr, quarterIndices, 32, bigintBase), + // These should be flattened because dictionary encoding isn't giving + // enough benefit to outweigh the cost. + BaseVector::wrapInDictionary(nullptr, allButOneIndices, 32, bigintBase), + BaseVector::wrapInDictionary(nullptr, allIndices, 32, bigintBase), + // These should keep dictionary encoding because strings are variable + // length. + BaseVector::wrapInDictionary(nullptr, oneIndex, 32, stringBase), + BaseVector::wrapInDictionary(nullptr, quarterIndices, 32, stringBase), + BaseVector::wrapInDictionary(nullptr, allButOneIndices, 32, stringBase), + // This should be flattened because the alphabet is the same as the + // flattened vector. + BaseVector::wrapInDictionary(nullptr, allIndices, 32, stringBase), + }); + + std::ostringstream out; + serializeBatch(rows, &out, /*serdeOptions=*/nullptr); + const auto serialized = out.str(); + + auto rowType = asRowType(rows->type()); + auto deserialized = + deserialize(rowType, serialized, /*serdeOptions=*/nullptr); + + assertEqualVectors(rows, deserialized); + + // smallInt + one index + ASSERT_EQ(deserialized->childAt(0)->encoding(), VectorEncoding::Simple::FLAT); + // int + one index + ASSERT_EQ(deserialized->childAt(1)->encoding(), VectorEncoding::Simple::FLAT); + // bigint + one index + ASSERT_EQ( + deserialized->childAt(2)->encoding(), VectorEncoding::Simple::DICTIONARY); + // bigint + quarter indices + ASSERT_EQ( + deserialized->childAt(3)->encoding(), VectorEncoding::Simple::DICTIONARY); + // bigint + all but one indices + ASSERT_EQ(deserialized->childAt(4)->encoding(), VectorEncoding::Simple::FLAT); + // bigint + all indices + ASSERT_EQ(deserialized->childAt(5)->encoding(), VectorEncoding::Simple::FLAT); + // string + one index + ASSERT_EQ( + deserialized->childAt(6)->encoding(), VectorEncoding::Simple::DICTIONARY); + // string + quarter indices + ASSERT_EQ( + deserialized->childAt(7)->encoding(), VectorEncoding::Simple::DICTIONARY); + // string + all but one indices + ASSERT_EQ( + deserialized->childAt(8)->encoding(), VectorEncoding::Simple::DICTIONARY); + // string + all indices + ASSERT_EQ(deserialized->childAt(9)->encoding(), VectorEncoding::Simple::FLAT); +} + TEST_P(PrestoSerializerTest, lazy) { constexpr int kSize = 1000; auto rowVector = makeTestVector(kSize); @@ -996,7 +1071,7 @@ TEST_P(PrestoSerializerTest, encodedRoundtrip) { auto inputRowVector = fuzzer.fuzzInputRow(rowType); serializer::presto::PrestoVectorSerde::PrestoOptions serdeOpts; serdeOpts.nullsFirst = i % 2 == 0; - testEncodedRoundTrip(inputRowVector, &serdeOpts); + testBatchVectorSerializerRoundTrip(inputRowVector, true, &serdeOpts); } } diff --git a/velox/vector/BaseVector.cpp b/velox/vector/BaseVector.cpp index d64f37430fe7..60344209b1d7 100644 --- a/velox/vector/BaseVector.cpp +++ b/velox/vector/BaseVector.cpp @@ -24,6 +24,7 @@ #include "velox/vector/LazyVector.h" #include "velox/vector/SequenceVector.h" #include "velox/vector/TypeAliases.h" +#include "velox/vector/VectorEncoding.h" #include "velox/vector/VectorPool.h" #include "velox/vector/VectorTypeUtils.h" @@ -765,6 +766,21 @@ VectorPtr BaseVector::transpose(BufferPtr indices, VectorPtr&& source) { BufferPtr(nullptr), std::move(indices), size, std::move(source)); } +// static +const VectorPtr& BaseVector::wrappedVectorShared(const VectorPtr& vector) { + switch (vector->encoding()) { + case VectorEncoding::Simple::CONSTANT: + case VectorEncoding::Simple::DICTIONARY: + case VectorEncoding::Simple::SEQUENCE: + return vector->valueVector() ? wrappedVectorShared(vector->valueVector()) + : vector; + case VectorEncoding::Simple::LAZY: + return wrappedVectorShared(loadedVectorShared(vector)); + default: + return vector; + } +} + bool isLazyNotLoaded(const BaseVector& vector) { switch (vector.encoding()) { case VectorEncoding::Simple::LAZY: diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index 591a0fc14dce..5610b384c971 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -384,6 +384,8 @@ class BaseVector { return this; } + static const VectorPtr& wrappedVectorShared(const VectorPtr& vector); + // Returns the index to apply for 'index' in the vector returned by // wrappedVector(). Translates the index over any nesting of // dictionaries, sequences and constants.