From c40fbdcaddccf8d8121ec37f98e52475b813687a Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 4 Jul 2024 17:09:56 +0800 Subject: [PATCH] Add RowsStreamingWindowBuild to avoid OOM in Window operator (9025) --- velox/exec/AggregateWindow.cpp | 3 +- velox/exec/CMakeLists.txt | 1 + velox/exec/RowsStreamingWindowBuild.cpp | 88 ++++++++++++++++++++ velox/exec/RowsStreamingWindowBuild.h | 84 +++++++++++++++++++ velox/exec/SortWindowBuild.cpp | 6 +- velox/exec/SortWindowBuild.h | 6 +- velox/exec/StreamingWindowBuild.cpp | 4 +- velox/exec/StreamingWindowBuild.h | 6 +- velox/exec/Window.cpp | 73 +++++++++++++++-- velox/exec/Window.h | 4 +- velox/exec/WindowBuild.h | 13 ++- velox/exec/WindowFunction.cpp | 14 +++- velox/exec/WindowFunction.h | 26 +++++- velox/exec/WindowPartition.cpp | 91 ++++++++++++++++++--- velox/exec/WindowPartition.h | 65 ++++++++++++++- velox/exec/fuzzer/AggregationFuzzer.cpp | 3 + velox/exec/fuzzer/AggregationFuzzerBase.cpp | 14 ++++ velox/exec/fuzzer/AggregationFuzzerBase.h | 2 + velox/exec/fuzzer/WindowFuzzer.cpp | 34 ++++++-- velox/exec/fuzzer/WindowFuzzer.h | 6 +- velox/exec/tests/WindowTest.cpp | 69 ++++++++++++++++ velox/functions/lib/window/Rank.cpp | 21 +++-- velox/functions/lib/window/RowNumber.cpp | 3 +- 23 files changed, 589 insertions(+), 47 deletions(-) create mode 100644 velox/exec/RowsStreamingWindowBuild.cpp create mode 100644 velox/exec/RowsStreamingWindowBuild.h diff --git a/velox/exec/AggregateWindow.cpp b/velox/exec/AggregateWindow.cpp index cb32bd0779c3..b3a003974826 100644 --- a/velox/exec/AggregateWindow.cpp +++ b/velox/exec/AggregateWindow.cpp @@ -426,7 +426,8 @@ void registerAggregateWindowFunction(const std::string& name) { pool, stringAllocator, config); - }); + }, + {exec::ProcessedUnit::kRows, true}); } } } // namespace facebook::velox::exec diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index dc89f71dbdc5..0fa9c4a41bc8 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -62,6 +62,7 @@ add_library( PlanNodeStats.cpp PrefixSort.cpp ProbeOperatorState.cpp + RowsStreamingWindowBuild.cpp RowContainer.cpp RowNumber.cpp SortBuffer.cpp diff --git a/velox/exec/RowsStreamingWindowBuild.cpp b/velox/exec/RowsStreamingWindowBuild.cpp new file mode 100644 index 000000000000..2a964f9d3da3 --- /dev/null +++ b/velox/exec/RowsStreamingWindowBuild.cpp @@ -0,0 +1,88 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/RowsStreamingWindowBuild.h" +#include "velox/common/testutil/TestValue.h" + +namespace facebook::velox::exec { + +RowsStreamingWindowBuild::RowsStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection) + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} + +void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) { + if (windowPartitions_.size() <= inputPartition_) { + windowPartitions_.push_back(std::make_shared( + data_.get(), inversedInputChannels_, sortKeyInfo_)); + } + + windowPartitions_[inputPartition_]->addRows(inputRows_); + + if (isFinished) { + windowPartitions_[inputPartition_]->setComplete(); + inputPartition_++; + } + + inputRows_.clear(); +} + +void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { + velox::common::testutil::TestValue::adjust( + "facebook::velox::exec::RowsStreamingWindowBuild::addInput", this); + for (auto i = 0; i < inputChannels_.size(); ++i) { + decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); + } + + for (auto row = 0; row < input->size(); ++row) { + char* newRow = data_->newRow(); + + for (auto col = 0; col < input->childrenSize(); ++col) { + data_->store(decodedInputVectors_[col], row, newRow, col); + } + + if (previousRow_ != nullptr && + compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { + buildNextInputOrPartition(true); + } + + if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) { + buildNextInputOrPartition(false); + } + + inputRows_.push_back(newRow); + previousRow_ = newRow; + } +} + +void RowsStreamingWindowBuild::noMoreInput() { + buildNextInputOrPartition(true); +} + +std::shared_ptr RowsStreamingWindowBuild::nextPartition() { + // The previous partition has already been set to nullptr by the + // Window.cpp#callResetPartition() method. + return windowPartitions_[++outputPartition_]; +} + +bool RowsStreamingWindowBuild::hasNextPartition() { + return windowPartitions_.size() > 0 && + outputPartition_ <= int(windowPartitions_.size() - 2); +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/RowsStreamingWindowBuild.h b/velox/exec/RowsStreamingWindowBuild.h new file mode 100644 index 000000000000..5bccb3d24c7e --- /dev/null +++ b/velox/exec/RowsStreamingWindowBuild.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/WindowBuild.h" + +namespace facebook::velox::exec { + +/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of +/// processing window functions as rows arrive within a single partition, +/// without the need to wait for the entire partition to be ready. This approach +/// can significantly reduce memory usage, especially when a single partition +/// contains a large amount of data. It is particularly suited for optimizing +/// rank and row_number functions, as well as aggregate window functions with a +/// default frame. +class RowsStreamingWindowBuild : public WindowBuild { + public: + RowsStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection); + + void addInput(RowVectorPtr input) override; + + void spill() override { + VELOX_UNREACHABLE(); + } + + std::optional spilledStats() const override { + return std::nullopt; + } + + void noMoreInput() override; + + bool hasNextPartition() override; + + std::shared_ptr nextPartition() override; + + bool needsInput() override { + // No partitions are available or the currentPartition is the last available + // one, so can consume input rows. + return windowPartitions_.size() == 0 || + outputPartition_ == windowPartitions_.size() - 1; + } + + std::string_view windowBuildType() const override { + return "RowsStreamingWindowBuild"; + } + + private: + void buildNextInputOrPartition(bool isFinished); + + // Holds input rows within the current partition. + std::vector inputRows_; + + // Used to compare rows based on partitionKeys. + char* previousRow_ = nullptr; + + // Current partition being output. Used to return the WidnowPartitions. + vector_size_t outputPartition_ = -1; + + // Current partition when adding input. Used to construct WindowPartitions. + vector_size_t inputPartition_ = 0; + + // Holds all the WindowPartitions. + std::vector> windowPartitions_; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index c65009012fdd..400b1edbe636 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -294,11 +294,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() { } } -std::unique_ptr SortWindowBuild::nextPartition() { +std::shared_ptr SortWindowBuild::nextPartition() { if (merge_ != nullptr) { VELOX_CHECK(!sortedRows_.empty(), "No window partitions available") auto partition = folly::Range(sortedRows_.data(), sortedRows_.size()); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } @@ -316,7 +316,7 @@ std::unique_ptr SortWindowBuild::nextPartition() { auto partition = folly::Range( sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index 645949ddb7e0..900978ba9399 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -53,7 +53,11 @@ class SortWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; + + std::string_view windowBuildType() const override { + return "SortWindowBuild"; + } private: void ensureInputFits(const RowVectorPtr& input); diff --git a/velox/exec/StreamingWindowBuild.cpp b/velox/exec/StreamingWindowBuild.cpp index 791d6b886ca7..db1fb5846fbb 100644 --- a/velox/exec/StreamingWindowBuild.cpp +++ b/velox/exec/StreamingWindowBuild.cpp @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() { partitionStartRows_.push_back(sortedRows_.size()); } -std::unique_ptr StreamingWindowBuild::nextPartition() { +std::shared_ptr StreamingWindowBuild::nextPartition() { VELOX_CHECK_GT( partitionStartRows_.size(), 0, "No window partitions available") @@ -89,7 +89,7 @@ std::unique_ptr StreamingWindowBuild::nextPartition() { sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } diff --git a/velox/exec/StreamingWindowBuild.h b/velox/exec/StreamingWindowBuild.h index 2573f1f7e8d5..f69079f7d625 100644 --- a/velox/exec/StreamingWindowBuild.h +++ b/velox/exec/StreamingWindowBuild.h @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; bool needsInput() override { // No partitions are available or the currentPartition is the last available @@ -55,6 +55,10 @@ class StreamingWindowBuild : public WindowBuild { currentPartition_ == partitionStartRows_.size() - 2; } + std::string_view windowBuildType() const override { + return "StreamingWindowBuild"; + } + private: void buildNextPartition(); diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 75fad130afe6..f89a76591e0f 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/Window.h" #include "velox/exec/OperatorUtils.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/SortWindowBuild.h" #include "velox/exec/StreamingWindowBuild.h" #include "velox/exec/Task.h" @@ -41,8 +42,13 @@ Window::Window( auto* spillConfig = spillConfig_.has_value() ? &spillConfig_.value() : nullptr; if (windowNode->inputsSorted()) { - windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); + if (supportRowsStreaming()) { + windowBuild_ = std::make_unique( + windowNode_, pool(), spillConfig, &nonReclaimableSection_); + } else { + windowBuild_ = std::make_unique( + windowNode, pool(), spillConfig, &nonReclaimableSection_); + } } else { windowBuild_ = std::make_unique( windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_); @@ -54,6 +60,7 @@ void Window::initialize() { VELOX_CHECK_NOT_NULL(windowNode_); createWindowFunctions(); createPeerAndFrameBuffers(); + windowBuild_->setNumRowsPerOutput(numRowsPerOutput_); windowNode_.reset(); } @@ -187,6 +194,37 @@ void Window::createWindowFunctions() { } } +// Support 'rank', 'dense_rank' and +// 'row_number' functions and the agg window function with default frame. +bool Window::supportRowsStreaming() { + bool supportsStreaming = false; + + for (const auto& windowNodeFunction : windowNode_->windowFunctions()) { + const auto& functionName = windowNodeFunction.functionCall->name(); + auto windowFunctionMetadata = + exec::getWindowFunctionMetadata(functionName).value(); + + if (windowFunctionMetadata.processedUnit == ProcessedUnit::kRows) { + const auto& frame = windowNodeFunction.frame; + bool isDefaultFrame = + (frame.startType == + core::WindowNode::BoundType::kUnboundedPreceding && + frame.endType == core::WindowNode::BoundType::kCurrentRow); + if (!windowFunctionMetadata.isAggregateWindow || isDefaultFrame) { + supportsStreaming = true; + } else { + supportsStreaming = false; + break; + } + } else { + supportsStreaming = false; + break; + } + } + + return supportsStreaming; +} + void Window::addInput(RowVectorPtr input) { windowBuild_->addInput(input); numRows_ += input->size(); @@ -543,9 +581,12 @@ void Window::callApplyForPartitionRows( vector_size_t endRow, vector_size_t resultOffset, const RowVectorPtr& result) { - getInputColumns(startRow, endRow, resultOffset, result); - + // The lastRow that was retained from the previous batch will be deleted in + // the computePeerAndFrameBuffers method after peer group compare. Thereforre, + // the getInputColumns method need to be called subsequently. computePeerAndFrameBuffers(startRow, endRow); + + getInputColumns(startRow, endRow, resultOffset, result); vector_size_t numFuncs = windowFunctions_.size(); for (auto w = 0; w < numFuncs; w++) { windowFunctions_[w]->apply( @@ -561,6 +602,10 @@ void Window::callApplyForPartitionRows( vector_size_t numRows = endRow - startRow; numProcessedRows_ += numRows; partitionOffset_ += numRows; + + if (currentPartition_->isPartial()) { + currentPartition_->clearOutputRows(numRows); + } } vector_size_t Window::callApplyLoop( @@ -574,8 +619,9 @@ vector_size_t Window::callApplyLoop( // This function requires that the currentPartition_ is available for output. VELOX_DCHECK_NOT_NULL(currentPartition_); while (numOutputRowsLeft > 0) { - auto rowsForCurrentPartition = - currentPartition_->numRows() - partitionOffset_; + auto rowsForCurrentPartition = currentPartition_->isPartial() + ? currentPartition_->numRowsForProcessing() + : currentPartition_->numRowsForProcessing() - partitionOffset_; if (rowsForCurrentPartition <= numOutputRowsLeft) { // Current partition can fit completely in the output buffer. // So output all its rows. @@ -586,6 +632,13 @@ vector_size_t Window::callApplyLoop( result); resultIndex += rowsForCurrentPartition; numOutputRowsLeft -= rowsForCurrentPartition; + + if (!currentPartition_->isComplete()) { + // Still more data for the current partition would need to be processed. + // So resume on the next getOutput call. + break; + } + callResetPartition(); if (!currentPartition_) { // The WindowBuild doesn't have any more partitions to process right @@ -628,6 +681,14 @@ RowVectorPtr Window::getOutput() { } } + if (!currentPartition_->isComplete() && + (currentPartition_->numRowsForProcessing() == 0)) { + // The numRows may be 1, because we keep the last row in previous batch to + // compare with the first row in next batch to determine whether they are in + // same peer group. + return nullptr; + } + auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft); auto result = BaseVector::create( outputType_, numOutputRows, operatorCtx_->pool()); diff --git a/velox/exec/Window.h b/velox/exec/Window.h index 9be9a011baae..88e3eaa94432 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -88,6 +88,8 @@ class Window : public Operator { const std::optional end; }; + bool supportRowsStreaming(); + // Creates WindowFunction and frame objects for this operator. void createWindowFunctions(); @@ -165,7 +167,7 @@ class Window : public Operator { // Used to access window partition rows and columns by the window // operator and functions. This structure is owned by the WindowBuild. - std::unique_ptr currentPartition_; + std::shared_ptr currentPartition_; // HashStringAllocator required by functions that allocate out of line // buffers. diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h index 9a1c0a6bfd7a..25e117df6d6b 100644 --- a/velox/exec/WindowBuild.h +++ b/velox/exec/WindowBuild.h @@ -68,7 +68,11 @@ class WindowBuild { // the underlying columns of Window partition data. // Check hasNextPartition() before invoking this function. This function fails // if called when no partition is available. - virtual std::unique_ptr nextPartition() = 0; + virtual std::shared_ptr nextPartition() = 0; + + /// The type of WindowBuild: RowStreamingWindowBuild, StreamingWindowBuild and + /// SortWindowBuild. + virtual std::string_view windowBuildType() const = 0; // Returns the average size of input rows in bytes stored in the // data container of the WindowBuild. @@ -76,6 +80,10 @@ class WindowBuild { return data_->estimateRowSize(); } + void setNumRowsPerOutput(vector_size_t numRowsPerOutput) { + numRowsPerOutput_ = numRowsPerOutput; + } + protected: bool compareRowsWithKeys( const char* lhs, @@ -113,6 +121,9 @@ class WindowBuild { // Number of input rows. vector_size_t numRows_ = 0; + + // The maximum number of rows that can fit into an output block. + vector_size_t numRowsPerOutput_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/WindowFunction.cpp b/velox/exec/WindowFunction.cpp index b093024a1dbe..b3419c3ba380 100644 --- a/velox/exec/WindowFunction.cpp +++ b/velox/exec/WindowFunction.cpp @@ -41,13 +41,23 @@ std::optional getWindowFunctionEntry( bool registerWindowFunction( const std::string& name, std::vector signatures, - WindowFunctionFactory factory) { + WindowFunctionFactory factory, + WindowFunctionMetadata metadata) { auto sanitizedName = sanitizeName(name); windowFunctions()[sanitizedName] = { - std::move(signatures), std::move(factory)}; + std::move(signatures), std::move(factory), std::move(metadata)}; return true; } +std::optional getWindowFunctionMetadata( + const std::string& name) { + auto sanitizedName = sanitizeName(name); + if (auto func = getWindowFunctionEntry(sanitizedName)) { + return func.value()->metadata; + } + return std::nullopt; +} + std::optional> getWindowFunctionSignatures( const std::string& name) { auto sanitizedName = sanitizeName(name); diff --git a/velox/exec/WindowFunction.h b/velox/exec/WindowFunction.h index ee0ef26869c1..8e1f91e2cb75 100644 --- a/velox/exec/WindowFunction.h +++ b/velox/exec/WindowFunction.h @@ -31,6 +31,24 @@ struct WindowFunctionArg { std::optional index; }; +/// The ProcessedUnit for calculating the window function. +enum class ProcessedUnit { + // Calculation may start only after all rows within a partitions are + // available. + kPartition, + // Calculation may being as soon as rows are available within a single + // partition, without waiting for all data in the partition to be ready + kRows, +}; + +/// Indicates whether the function is for an aggregate used as a window +/// function. It also specifies whether the ProcessedUnit of Window function is +/// by partition or by rows. +struct WindowFunctionMetadata { + ProcessedUnit processedUnit; + bool isAggregateWindow; +}; + class WindowFunction { public: explicit WindowFunction( @@ -149,7 +167,8 @@ using WindowFunctionFactory = std::function( bool registerWindowFunction( const std::string& name, std::vector signatures, - WindowFunctionFactory factory); + WindowFunctionFactory factory, + WindowFunctionMetadata metadata = {ProcessedUnit::kPartition, false}); /// Returns signatures of the window function with the specified name. /// Returns empty std::optional if function with that name is not found. @@ -159,8 +178,13 @@ std::optional> getWindowFunctionSignatures( struct WindowFunctionEntry { std::vector signatures; WindowFunctionFactory factory; + WindowFunctionMetadata metadata; }; +/// Returns std::nullopt if the function doesn't exist in the WindowFunctionMap. +std::optional getWindowFunctionMetadata( + const std::string& name); + using WindowFunctionMap = std::unordered_map; /// Returns a map of all window function names to their registrations. diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index e7dcac7f9f35..8a993e8536bc 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -29,6 +29,38 @@ WindowPartition::WindowPartition( for (int i = 0; i < inputMapping_.size(); i++) { columns_.emplace_back(data_->columnAt(inputMapping_[i])); } + + complete_ = true; +} + +WindowPartition::WindowPartition( + RowContainer* data, + const std::vector& inputMapping, + const std::vector>& sortKeyInfo) + : data_(data), inputMapping_(inputMapping), sortKeyInfo_(sortKeyInfo) { + for (int i = 0; i < inputMapping_.size(); i++) { + columns_.emplace_back(data_->columnAt(inputMapping_[i])); + } + + rows_.clear(); + partition_ = folly::Range(rows_.data(), rows_.size()); + complete_ = false; + partial_ = true; +} + +void WindowPartition::addRows(const std::vector& rows) { + rows_.insert(rows_.end(), rows.begin(), rows.end()); + partition_ = folly::Range(rows_.data(), rows_.size()); +} + +void WindowPartition::clearOutputRows(vector_size_t numRows) { + VELOX_CHECK(partial_, "Current WindowPartition should be partial."); + if (!complete_ || (complete_ && rows_.size() >= numRows)) { + data_->eraseRows(folly::Range(rows_.data(), numRows - 1)); + rows_.erase(rows_.begin(), rows_.begin() + numRows - 1); + partition_ = folly::Range(rows_.data(), rows_.size()); + startRow_ += numRows; + } } void WindowPartition::extractColumn( @@ -51,7 +83,7 @@ void WindowPartition::extractColumn( vector_size_t resultOffset, const VectorPtr& result) const { RowContainer::extractColumn( - partition_.data() + partitionOffset, + partition_.data() + partitionOffset - startRow(), numRows, columns_[columnIndex], resultOffset, @@ -130,23 +162,64 @@ bool WindowPartition::compareRowsWithSortKeys(const char* lhs, const char* rhs) return false; } +vector_size_t WindowPartition::findPeerGroupEndIndex( + vector_size_t currentStart, + vector_size_t lastPartitionRow, + std::function peerCompare) { + auto peerEnd = currentStart; + while (peerEnd <= lastPartitionRow) { + if (peerCompare( + partition_[currentStart - startRow()], + partition_[peerEnd - startRow()])) { + break; + } + peerEnd++; + } + return peerEnd; +} + std::pair WindowPartition::computePeerBuffers( vector_size_t start, vector_size_t end, vector_size_t prevPeerStart, vector_size_t prevPeerEnd, vector_size_t* rawPeerStarts, - vector_size_t* rawPeerEnds) const { + vector_size_t* rawPeerEnds) { auto peerCompare = [&](const char* lhs, const char* rhs) -> bool { return compareRowsWithSortKeys(lhs, rhs); }; - VELOX_CHECK_LE(end, numRows()); + VELOX_CHECK_LE(end, numRows() + startRow()); - auto lastPartitionRow = numRows() - 1; + auto lastPartitionRow = numRows() + startRow() - 1; auto peerStart = prevPeerStart; auto peerEnd = prevPeerEnd; - for (auto i = start, j = 0; i < end; i++, j++) { + + auto nextStart = start; + + if (partial_ && start > 0) { + lastPartitionRow = end - 1; + auto peerGroup = peerCompare(partition_[0], partition_[1]); + + // The first row is the last row in previous batch. So Delete it after + // compare. + data_->eraseRows(folly::Range(rows_.data(), 1)); + rows_.erase(rows_.begin(), rows_.begin() + 1); + partition_ = folly::Range(rows_.data(), rows_.size()); + + if (!peerGroup) { + peerEnd = findPeerGroupEndIndex(start, lastPartitionRow, peerCompare); + + for (auto j = 0; j < (peerEnd - start); j++) { + rawPeerStarts[j] = peerStart; + rawPeerEnds[j] = peerEnd - 1; + } + + nextStart = peerEnd; + } + } + + for (auto i = nextStart, j = (nextStart - start); i < end; i++, j++) { // When traversing input partition rows, the peers are the rows // with the same values for the ORDER BY clause. These rows // are equal in some ways and affect the results of ranking functions. @@ -160,13 +233,7 @@ std::pair WindowPartition::computePeerBuffers( // Compute peerStart and peerEnd rows for the first row of the partition // or when past the previous peerGroup. peerStart = i; - peerEnd = i; - while (peerEnd <= lastPartitionRow) { - if (peerCompare(partition_[peerStart], partition_[peerEnd])) { - break; - } - peerEnd++; - } + peerEnd = findPeerGroupEndIndex(peerStart, lastPartitionRow, peerCompare); } rawPeerStarts[j] = peerStart; diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 7073af3a4238..b9714c2334a9 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -20,9 +20,15 @@ /// Simple WindowPartition that builds over the RowContainer used for storing /// the input rows in the Window Operator. This works completely in-memory. +/// WindowPartition supports partial window partitioning to facilitate +/// RowsStreamingWindowBuild, which means that subsequent calculations within +/// the WindowPartition do not need to wait until the current partition is fully +/// ready before commencing. Calculations can begin as soon as a portion of the +/// rows are ready. /// TODO: This implementation will be revised for Spill to disk semantics. namespace facebook::velox::exec { + class WindowPartition { public: /// The WindowPartition is used by the Window operator and WindowFunction @@ -42,11 +48,45 @@ class WindowPartition { const std::vector>& sortKeyInfo); + /// The WindowPartition is used for partial partition when the input data will + /// be a subset of the entire partition. + WindowPartition( + RowContainer* data, + const std::vector& inputMapping, + const std::vector>& + sortKeyInfo); + + /// Adds remaining input rows when building the partial WindowPartition. + void addRows(const std::vector& rows); + + /// Clear the processed rows fow partial WindowPartition. + void clearOutputRows(vector_size_t numRows); + /// Returns the number of rows in the current WindowPartition. vector_size_t numRows() const { return partition_.size(); } + /// Returns the number of rows that will be processed. + vector_size_t numRowsForProcessing() const { + if (startRow_ > 0) { + return partition_.size() - 1; + } + return partition_.size(); + } + + bool isComplete() const { + return complete_; + } + + bool isPartial() const { + return partial_; + } + + void setComplete() { + complete_ = true; + } + /// Copies the values at 'columnIndex' into 'result' (starting at /// 'resultOffset') for the rows at positions in the 'rowNumbers' /// array from the partition input data. @@ -107,7 +147,7 @@ class WindowPartition { vector_size_t prevPeerStart, vector_size_t prevPeerEnd, vector_size_t* rawPeerStarts, - vector_size_t* rawPeerEnds) const; + vector_size_t* rawPeerEnds); /// Sets in 'rawFrameBounds' the frame boundary for the k range /// preceding/following frame. @@ -130,6 +170,11 @@ class WindowPartition { private: bool compareRowsWithSortKeys(const char* lhs, const char* rhs) const; + vector_size_t findPeerGroupEndIndex( + vector_size_t currentStart, + vector_size_t lastRow, + std::function peerCompare); + // Searches for 'currentRow[frameColumn]' in 'orderByColumn' of rows between // 'start' and 'end' in the partition. 'firstMatch' specifies if first or last // row is matched. @@ -162,16 +207,31 @@ class WindowPartition { const vector_size_t* rawPeerBounds, vector_size_t* rawFrameBounds) const; + // Returns the starting offset of the current partial window partition within + // the full partition. + vector_size_t startRow() const { + return startRow_; + } + // The RowContainer associated with the partition. // It is owned by the WindowBuild that creates the partition. RowContainer* data_; + // Holds input rows within the partial partition. + std::vector rows_; + // folly::Range is for the partition rows iterator provided by the // Window operator. The pointers are to rows from a RowContainer owned // by the operator. We can assume these are valid values for the lifetime // of WindowPartition. folly::Range partition_; + // Indicates that the partial window partitioning process has been completed. + bool complete_ = false; + + // Indicates partial window partition. + bool partial_ = false; + // Mapping from window input column -> index in data_. This is required // because the WindowBuild reorders data_ to place partition and sort keys // before other columns in data_. But the Window Operator and Function code @@ -189,5 +249,8 @@ class WindowPartition { // corresponding indexes of their input arguments into this vector. // They will request for column vector values at the respective index. std::vector columns_; + + // The partition offset of the first row in rows_. + vector_size_t startRow_ = 0; }; } // namespace facebook::velox::exec diff --git a/velox/exec/fuzzer/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp index 456c9644e79e..bb85e8b96201 100644 --- a/velox/exec/fuzzer/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -166,6 +166,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { false /*injectSpill*/, false /*abandonPartial*/, customVerification, + false, customVerifiers, expected, maxDrivers); @@ -177,6 +178,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { true /*injectSpill*/, false /*abandonPartial*/, customVerification, + false, customVerifiers, expected, maxDrivers); @@ -190,6 +192,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { false /*injectSpill*/, true /*abandonPartial*/, customVerification, + false, customVerifiers, expected, maxDrivers); diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index 310a61629361..d61b79132928 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -16,10 +16,12 @@ #include "velox/exec/fuzzer/AggregationFuzzerBase.h" #include +#include #include "velox/common/base/Fs.h" #include "velox/common/base/VeloxException.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/fuzzer/DuckQueryRunner.h" #include "velox/exec/fuzzer/PrestoQueryRunner.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -418,6 +420,7 @@ velox::fuzzer::ResultOrError AggregationFuzzerBase::execute( const std::vector& splits, bool injectSpill, bool abandonPartial, + bool supportRowsStreaming, int32_t maxDrivers) { LOG(INFO) << "Executing query plan: " << std::endl << plan->toString(true, true); @@ -447,6 +450,15 @@ velox::fuzzer::ResultOrError AggregationFuzzerBase::execute( .config(core::QueryConfig::kMaxExtendedPartialAggregationMemory, "0"); } + if (supportRowsStreaming) { + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::RowsStreamingWindowBuild::addInput", + std::function( + ([&](const RowsStreamingWindowBuild* build) { + ASSERT_EQ(build->windowBuildType(), "RowsStreamingWindowBuild"); + }))); + } + if (!splits.empty()) { builder.splits(splits); } @@ -537,6 +549,7 @@ void AggregationFuzzerBase::testPlan( bool injectSpill, bool abandonPartial, bool customVerification, + bool supportRowsStreaming, const std::vector>& customVerifiers, const velox::fuzzer::ResultOrError& expected, int32_t maxDrivers) { @@ -545,6 +558,7 @@ void AggregationFuzzerBase::testPlan( planWithSplits.splits, injectSpill, abandonPartial, + supportRowsStreaming, maxDrivers); compare(actual, customVerification, customVerifiers, expected); } diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index 32189bf89af5..01a5163c1518 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -218,6 +218,7 @@ class AggregationFuzzerBase { const std::vector& splits = {}, bool injectSpill = false, bool abandonPartial = false, + bool supportRowsStreaming = false, int32_t maxDrivers = 2); // Will throw if referenceQueryRunner doesn't support @@ -251,6 +252,7 @@ class AggregationFuzzerBase { bool injectSpill, bool abandonPartial, bool customVerification, + bool supportRowsStreaming, const std::vector>& customVerifiers, const velox::fuzzer::ResultOrError& expected, int32_t maxDrivers = 2); diff --git a/velox/exec/fuzzer/WindowFuzzer.cpp b/velox/exec/fuzzer/WindowFuzzer.cpp index 734e87a702ac..785aafcc4318 100644 --- a/velox/exec/fuzzer/WindowFuzzer.cpp +++ b/velox/exec/fuzzer/WindowFuzzer.cpp @@ -60,7 +60,7 @@ void WindowFuzzer::addWindowFunctionSignatures( } } -std::tuple WindowFuzzer::generateFrameClause() { +std::tuple WindowFuzzer::generateFrameClause() { auto frameType = [](int value) -> const std::string { switch (value) { case 0: @@ -144,9 +144,12 @@ std::tuple WindowFuzzer::generateFrameClause() { auto frameStart = frameBound(startBoundOptions[startBoundIndex]); auto frameEnd = frameBound(endBoundOptions[endBoundIndex]); + bool isDefaultFrame = + (frameStart == "UNBOUNDED PRECEDING" && frameEnd == "CURRENT ROW"); return std::make_tuple( frameTypeString + " BETWEEN " + frameStart + " AND " + frameEnd, - isRowsFrame); + isRowsFrame, + isDefaultFrame); } std::string WindowFuzzer::generateOrderByClause( @@ -233,7 +236,8 @@ void WindowFuzzer::go() { generateSortingKeysAndOrders("s", argNames, argTypes); } const auto partitionKeys = generateSortingKeys("p", argNames, argTypes); - const auto [frameClause, isRowsFrame] = generateFrameClause(); + const auto [frameClause, isRowsFrame, isDefaultFrame] = + generateFrameClause(); const auto input = generateInputDataWithRowNumber( argNames, argTypes, partitionKeys, signature); // If the function is order-dependent or uses "rows" frame, sort all input @@ -245,6 +249,12 @@ void WindowFuzzer::go() { logVectors(input); + auto windowFunctionMetadata = + exec::getWindowFunctionMetadata(signature.name).value(); + bool supportRowsStreaming = + windowFunctionMetadata.processedUnit == ProcessedUnit::kRows && + (!windowFunctionMetadata.isAggregateWindow || isDefaultFrame); + bool failed = verifyWindow( partitionKeys, sortingKeysAndOrders, @@ -253,7 +263,8 @@ void WindowFuzzer::go() { input, customVerification, customVerifier, - FLAGS_enable_window_reference_verification); + FLAGS_enable_window_reference_verification, + supportRowsStreaming); if (failed) { signatureWithStats.second.numFailed++; } @@ -288,6 +299,7 @@ void WindowFuzzer::testAlternativePlans( const std::string& functionCall, const std::vector& input, bool customVerification, + bool supportRowsStreaming, const std::shared_ptr& customVerifier, const velox::fuzzer::ResultOrError& expected) { std::vector plans; @@ -311,6 +323,8 @@ void WindowFuzzer::testAlternativePlans( {fmt::format("{} over ({})", functionCall, frame)}) .planNode(), {}}); + } else { + supportRowsStreaming = false; } // With TableScan. @@ -348,7 +362,13 @@ void WindowFuzzer::testAlternativePlans( for (const auto& plan : plans) { testPlan( - plan, false, false, customVerification, {customVerifier}, expected); + plan, + false, + false, + customVerification, + supportRowsStreaming, + {customVerifier}, + expected); } } @@ -380,7 +400,8 @@ bool WindowFuzzer::verifyWindow( const std::vector& input, bool customVerification, const std::shared_ptr& customVerifier, - bool enableWindowVerification) { + bool enableWindowVerification, + bool supportRowsStreaming) { SCOPE_EXIT { if (customVerifier) { customVerifier->reset(); @@ -447,6 +468,7 @@ bool WindowFuzzer::verifyWindow( functionCall, input, customVerification, + supportRowsStreaming, customVerifier, resultOrError); diff --git a/velox/exec/fuzzer/WindowFuzzer.h b/velox/exec/fuzzer/WindowFuzzer.h index f53e26098fb4..0bd662f273d0 100644 --- a/velox/exec/fuzzer/WindowFuzzer.h +++ b/velox/exec/fuzzer/WindowFuzzer.h @@ -81,7 +81,7 @@ class WindowFuzzer : public AggregationFuzzerBase { // Return a randomly generated frame clause string together with a boolean // flag indicating whether it is a ROWS frame. - std::tuple generateFrameClause(); + std::tuple generateFrameClause(); std::string generateOrderByClause( const std::vector& sortingKeysAndOrders); @@ -105,7 +105,8 @@ class WindowFuzzer : public AggregationFuzzerBase { const std::vector& input, bool customVerification, const std::shared_ptr& customVerifier, - bool enableWindowVerification); + bool enableWindowVerification, + bool supportRowsStreaming); void testAlternativePlans( const std::vector& partitionKeys, @@ -114,6 +115,7 @@ class WindowFuzzer : public AggregationFuzzerBase { const std::string& functionCall, const std::vector& input, bool customVerification, + bool supportRowsStreaming, const std::shared_ptr& customVerifier, const velox::fuzzer::ResultOrError& expected); diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index febdcd743d30..f5fdcbf4e1c6 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -16,6 +16,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -79,6 +80,74 @@ TEST_F(WindowTest, spill) { ASSERT_GT(stats.spilledPartitions, 0); } +TEST_F(WindowTest, rankWithEqualValue) { + auto data = makeRowVector( + {"c1"}, + {makeFlatVector(std::vector{1, 1, 1, 1, 1, 2, 2})}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (order by c1 rows unbounded preceding)"}; + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"c1"}, false) + .streamingWindow(kClauses) + .planNode(); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::RowsStreamingWindowBuild::addInput", + std::function( + ([&](const RowsStreamingWindowBuild* build) { + ASSERT_EQ(build->windowBuildType(), "RowsStreamingWindowBuild"); + }))); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .assertResults( + "SELECT *, rank() over (order by c1 rows unbounded preceding) FROM tmp"); +} + +TEST_F(WindowTest, rowStreamingWindowBuild) { + const vector_size_t size = 1'00; + + auto data = makeRowVector( + {makeFlatVector(size, [](auto row) { return row % 5; }), + makeFlatVector(size, [](auto row) { return row % 50; }), + makeFlatVector( + size, [](auto row) { return row % 3 + 1; }, nullEvery(5)), + makeFlatVector(size, [](auto row) { return row % 40; }), + makeFlatVector(size, [](auto row) { return row; })}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (partition by c0, c2 order by c1, c3)", + "dense_rank() over (partition by c0, c2 order by c1, c3)", + "row_number() over (partition by c0, c2 order by c1, c3)", + "sum(c4) over (partition by c0, c2 order by c1, c3)"}; + + auto plan = PlanBuilder() + .values({split(data, 10)}) + .orderBy({"c0", "c2", "c1", "c3"}, false) + .streamingWindow(kClauses) + .planNode(); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::RowsStreamingWindowBuild::addInput", + std::function( + ([&](const RowsStreamingWindowBuild* build) { + ASSERT_EQ(build->windowBuildType(), "RowsStreamingWindowBuild"); + }))); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .assertResults( + "SELECT *, rank() over (partition by c0, c2 order by c1, c3), dense_rank() over (partition by c0, c2 order by c1, c3), row_number() over (partition by c0, c2 order by c1, c3), sum(c4) over (partition by c0, c2 order by c1, c3) FROM tmp"); +} + TEST_F(WindowTest, missingFunctionSignature) { auto input = {makeRowVector({ makeFlatVector({1, 2, 3}), diff --git a/velox/functions/lib/window/Rank.cpp b/velox/functions/lib/window/Rank.cpp index 646557a2e30c..3962788c7025 100644 --- a/velox/functions/lib/window/Rank.cpp +++ b/velox/functions/lib/window/Rank.cpp @@ -97,9 +97,7 @@ void registerRankInternal( exec::FunctionSignatureBuilder().returnType(returnType).build(), }; - exec::registerWindowFunction( - name, - std::move(signatures), + auto windowFunctionFactory = [name]( const std::vector& /*args*/, const TypePtr& resultType, @@ -107,9 +105,20 @@ void registerRankInternal( velox::memory::MemoryPool* /*pool*/, HashStringAllocator* /*stringAllocator*/, const core::QueryConfig& /*queryConfig*/) - -> std::unique_ptr { - return std::make_unique>(resultType); - }); + -> std::unique_ptr { + return std::make_unique>(resultType); + }; + + if constexpr (TRank == RankType::kRank || TRank == RankType::kDenseRank) { + exec::registerWindowFunction( + name, + std::move(signatures), + std::move(windowFunctionFactory), + {exec::ProcessedUnit::kRows, false}); + } else { + exec::registerWindowFunction( + name, std::move(signatures), std::move(windowFunctionFactory)); + } } void registerRankBigint(const std::string& name) { diff --git a/velox/functions/lib/window/RowNumber.cpp b/velox/functions/lib/window/RowNumber.cpp index 16b7feb0a543..567451c485da 100644 --- a/velox/functions/lib/window/RowNumber.cpp +++ b/velox/functions/lib/window/RowNumber.cpp @@ -84,7 +84,8 @@ void registerRowNumber(const std::string& name, TypeKind resultTypeKind) { const core::QueryConfig& /*queryConfig*/) -> std::unique_ptr { return std::make_unique(resultType); - }); + }, + {exec::ProcessedUnit::kRows, false}); } void registerRowNumberInteger(const std::string& name) {