diff --git a/velox/exec/AggregateWindow.cpp b/velox/exec/AggregateWindow.cpp index cb32bd0779c3..b3a003974826 100644 --- a/velox/exec/AggregateWindow.cpp +++ b/velox/exec/AggregateWindow.cpp @@ -426,7 +426,8 @@ void registerAggregateWindowFunction(const std::string& name) { pool, stringAllocator, config); - }); + }, + {exec::ProcessedUnit::kRows, true}); } } } // namespace facebook::velox::exec diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 4bcc61321381..df456f163f62 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -62,6 +62,7 @@ velox_add_library( PlanNodeStats.cpp PrefixSort.cpp ProbeOperatorState.cpp + RowsStreamingWindowBuild.cpp RowContainer.cpp RowNumber.cpp SortBuffer.cpp diff --git a/velox/exec/RowsStreamingWindowBuild.cpp b/velox/exec/RowsStreamingWindowBuild.cpp new file mode 100644 index 000000000000..48fe788e71a5 --- /dev/null +++ b/velox/exec/RowsStreamingWindowBuild.cpp @@ -0,0 +1,86 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/RowsStreamingWindowBuild.h" +#include "velox/common/testutil/TestValue.h" + +namespace facebook::velox::exec { + +RowsStreamingWindowBuild::RowsStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection) + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} + +void RowsStreamingWindowBuild::buildNextInputOrPartition(bool isFinished) { + if (windowPartitions_.size() <= inputPartition_) { + windowPartitions_.push_back(std::make_shared( + data_.get(), inversedInputChannels_, sortKeyInfo_)); + } + + windowPartitions_[inputPartition_]->addRows(inputRows_); + + if (isFinished) { + windowPartitions_[inputPartition_]->setComplete(); + inputPartition_++; + } + + inputRows_.clear(); +} + +void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { + for (auto i = 0; i < inputChannels_.size(); ++i) { + decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); + } + + for (auto row = 0; row < input->size(); ++row) { + char* newRow = data_->newRow(); + + for (auto col = 0; col < input->childrenSize(); ++col) { + data_->store(decodedInputVectors_[col], row, newRow, col); + } + + if (previousRow_ != nullptr && + compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { + buildNextInputOrPartition(true); + } + + if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) { + buildNextInputOrPartition(false); + } + + inputRows_.push_back(newRow); + previousRow_ = newRow; + } +} + +void RowsStreamingWindowBuild::noMoreInput() { + buildNextInputOrPartition(true); +} + +std::shared_ptr RowsStreamingWindowBuild::nextPartition() { + // The previous partition has already been set to nullptr by the + // Window.cpp#callResetPartition() method. + return windowPartitions_[++outputPartition_]; +} + +bool RowsStreamingWindowBuild::hasNextPartition() { + return windowPartitions_.size() > 0 && + outputPartition_ <= int(windowPartitions_.size() - 2); +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/RowsStreamingWindowBuild.h b/velox/exec/RowsStreamingWindowBuild.h new file mode 100644 index 000000000000..714dde63db5c --- /dev/null +++ b/velox/exec/RowsStreamingWindowBuild.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/WindowBuild.h" + +namespace facebook::velox::exec { + +/// Unlike StreamingWindowBuild, RowsStreamingWindowBuild is capable of +/// processing window functions as rows arrive within a single partition, +/// without the need to wait for the entire partition to be ready. This approach +/// can significantly reduce memory usage, especially when a single partition +/// contains a large amount of data. It is particularly suited for optimizing +/// rank and row_number functions, as well as aggregate window functions with a +/// default frame. +class RowsStreamingWindowBuild : public WindowBuild { + public: + RowsStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection); + + void addInput(RowVectorPtr input) override; + + void spill() override { + VELOX_UNREACHABLE(); + } + + std::optional spilledStats() const override { + return std::nullopt; + } + + void noMoreInput() override; + + bool hasNextPartition() override; + + std::shared_ptr nextPartition() override; + + bool needsInput() override { + // No partitions are available or the currentPartition is the last available + // one, so can consume input rows. + return windowPartitions_.size() == 0 || + outputPartition_ == windowPartitions_.size() - 1; + } + + private: + void buildNextInputOrPartition(bool isFinished); + + // Holds input rows within the current partition. + std::vector inputRows_; + + // Used to compare rows based on partitionKeys. + char* previousRow_ = nullptr; + + // Current partition being output. Used to return the WidnowPartitions. + vector_size_t outputPartition_ = -1; + + // Current partition when adding input. Used to construct WindowPartitions. + vector_size_t inputPartition_ = 0; + + // Holds all the WindowPartitions. + std::vector> windowPartitions_; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index c65009012fdd..400b1edbe636 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -294,11 +294,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() { } } -std::unique_ptr SortWindowBuild::nextPartition() { +std::shared_ptr SortWindowBuild::nextPartition() { if (merge_ != nullptr) { VELOX_CHECK(!sortedRows_.empty(), "No window partitions available") auto partition = folly::Range(sortedRows_.data(), sortedRows_.size()); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } @@ -316,7 +316,7 @@ std::unique_ptr SortWindowBuild::nextPartition() { auto partition = folly::Range( sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index 645949ddb7e0..0caecfe6a5c3 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; private: void ensureInputFits(const RowVectorPtr& input); diff --git a/velox/exec/StreamingWindowBuild.cpp b/velox/exec/StreamingWindowBuild.cpp index 2d855867ebd0..579d7a07dbaf 100644 --- a/velox/exec/StreamingWindowBuild.cpp +++ b/velox/exec/StreamingWindowBuild.cpp @@ -60,7 +60,7 @@ void StreamingWindowBuild::noMoreInput() { partitionStartRows_.push_back(sortedRows_.size()); } -std::unique_ptr StreamingWindowBuild::nextPartition() { +std::shared_ptr StreamingWindowBuild::nextPartition() { VELOX_CHECK_GT( partitionStartRows_.size(), 0, "No window partitions available") @@ -91,7 +91,7 @@ std::unique_ptr StreamingWindowBuild::nextPartition() { sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } diff --git a/velox/exec/StreamingWindowBuild.h b/velox/exec/StreamingWindowBuild.h index a9c2e2abf473..99928e2d0217 100644 --- a/velox/exec/StreamingWindowBuild.h +++ b/velox/exec/StreamingWindowBuild.h @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; bool needsInput() override { // No partitions are available or the currentPartition is the last available diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 4577388006fa..5b5b8c8baca6 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/Window.h" #include "velox/exec/OperatorUtils.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/SortWindowBuild.h" #include "velox/exec/StreamingWindowBuild.h" #include "velox/exec/Task.h" @@ -41,8 +42,13 @@ Window::Window( auto* spillConfig = spillConfig_.has_value() ? &spillConfig_.value() : nullptr; if (windowNode->inputsSorted()) { - windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); + if (supportRowsStreaming()) { + windowBuild_ = std::make_unique( + windowNode_, pool(), spillConfig, &nonReclaimableSection_); + } else { + windowBuild_ = std::make_unique( + windowNode, pool(), spillConfig, &nonReclaimableSection_); + } } else { windowBuild_ = std::make_unique( windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_); @@ -54,6 +60,7 @@ void Window::initialize() { VELOX_CHECK_NOT_NULL(windowNode_); createWindowFunctions(); createPeerAndFrameBuffers(); + windowBuild_->setNumRowsPerOutput(numRowsPerOutput_); windowNode_.reset(); } @@ -188,6 +195,46 @@ void Window::createWindowFunctions() { } } +// Support 'rank', 'dense_rank' and +// 'row_number' functions and the agg window function with default frame. +bool Window::supportRowsStreaming() { + bool supportsStreaming = false; + + for (const auto& windowNodeFunction : windowNode_->windowFunctions()) { + const auto& functionName = windowNodeFunction.functionCall->name(); + auto windowFunctionMetadata = + exec::getWindowFunctionMetadata(functionName).value(); + + if (windowFunctionMetadata.processedUnit == ProcessedUnit::kRows) { + const auto& frame = windowNodeFunction.frame; + bool isDefaultFrame = + (frame.startType == + core::WindowNode::BoundType::kUnboundedPreceding && + frame.endType == core::WindowNode::BoundType::kCurrentRow); + if (!windowFunctionMetadata.isAggregateWindow || isDefaultFrame) { + supportsStreaming = true; + velox::common::testutil::TestValue::adjust( + "facebook::velox::exec::Window::supportRowsStreaming", + &supportsStreaming); + } else { + supportsStreaming = false; + velox::common::testutil::TestValue::adjust( + "facebook::velox::exec::Window::supportRowsStreaming", + &supportsStreaming); + break; + } + } else { + supportsStreaming = false; + velox::common::testutil::TestValue::adjust( + "facebook::velox::exec::Window::supportRowsStreaming", + &supportsStreaming); + break; + } + } + + return supportsStreaming; +} + void Window::addInput(RowVectorPtr input) { windowBuild_->addInput(input); numRows_ += input->size(); @@ -542,9 +589,12 @@ void Window::callApplyForPartitionRows( vector_size_t endRow, vector_size_t resultOffset, const RowVectorPtr& result) { - getInputColumns(startRow, endRow, resultOffset, result); - + // The lastRow that was retained from the previous batch will be deleted in + // the computePeerAndFrameBuffers method after peer group compare. Thereforre, + // the getInputColumns method need to be called subsequently. computePeerAndFrameBuffers(startRow, endRow); + + getInputColumns(startRow, endRow, resultOffset, result); vector_size_t numFuncs = windowFunctions_.size(); for (auto i = 0; i < numFuncs; ++i) { windowFunctions_[i]->apply( @@ -560,6 +610,10 @@ void Window::callApplyForPartitionRows( const vector_size_t numRows = endRow - startRow; numProcessedRows_ += numRows; partitionOffset_ += numRows; + + if (currentPartition_->isPartial()) { + currentPartition_->clearOutputRows(numRows); + } } vector_size_t Window::callApplyLoop( @@ -573,8 +627,9 @@ vector_size_t Window::callApplyLoop( // This function requires that the currentPartition_ is available for output. VELOX_DCHECK_NOT_NULL(currentPartition_); while (numOutputRowsLeft > 0) { - const auto rowsForCurrentPartition = - currentPartition_->numRows() - partitionOffset_; + auto rowsForCurrentPartition = currentPartition_->isPartial() + ? currentPartition_->numRowsForProcessing() + : currentPartition_->numRowsForProcessing() - partitionOffset_; if (rowsForCurrentPartition <= numOutputRowsLeft) { // Current partition can fit completely in the output buffer. // So output all its rows. @@ -585,6 +640,13 @@ vector_size_t Window::callApplyLoop( result); resultIndex += rowsForCurrentPartition; numOutputRowsLeft -= rowsForCurrentPartition; + + if (!currentPartition_->isComplete()) { + // Still more data for the current partition would need to be processed. + // So resume on the next getOutput call. + break; + } + callResetPartition(); if (currentPartition_ == nullptr) { // The WindowBuild doesn't have any more partitions to process right @@ -627,7 +689,15 @@ RowVectorPtr Window::getOutput() { } } - const auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft); + if (!currentPartition_->isComplete() && + (currentPartition_->numRowsForProcessing() == 0)) { + // The numRows may be 1, because we keep the last row in previous batch to + // compare with the first row in next batch to determine whether they are in + // same peer group. + return nullptr; + } + + auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft); auto result = BaseVector::create( outputType_, numOutputRows, operatorCtx_->pool()); diff --git a/velox/exec/Window.h b/velox/exec/Window.h index 393bcc364acc..ef73f11d7cd8 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -88,6 +88,8 @@ class Window : public Operator { const std::optional end; }; + bool supportRowsStreaming(); + // Creates WindowFunction and frame objects for this operator. void createWindowFunctions(); @@ -165,7 +167,7 @@ class Window : public Operator { // Used to access window partition rows and columns by the window // operator and functions. This structure is owned by the WindowBuild. - std::unique_ptr currentPartition_; + std::shared_ptr currentPartition_; // HashStringAllocator required by functions that allocate out of line // buffers. diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h index 1f9207c4fbd5..0b8f2d3676ad 100644 --- a/velox/exec/WindowBuild.h +++ b/velox/exec/WindowBuild.h @@ -61,12 +61,12 @@ class WindowBuild { /// operator to consume. virtual bool hasNextPartition() = 0; - /// The Window operator invokes this function to get the next Window partition - /// to pass along to the WindowFunction. The WindowPartition has APIs to - /// access the underlying columns of Window partition data. Check - /// hasNextPartition() before invoking this function. This function fails if - /// called when no partition is available. - virtual std::unique_ptr nextPartition() = 0; + // The Window operator invokes this function to get the next Window partition + // to pass along to the WindowFunction. The WindowPartition has APIs to access + // the underlying columns of Window partition data. + // Check hasNextPartition() before invoking this function. This function fails + // if called when no partition is available. + virtual std::shared_ptr nextPartition() = 0; /// Returns the average size of input rows in bytes stored in the data /// container of the WindowBuild. @@ -74,6 +74,10 @@ class WindowBuild { return data_->estimateRowSize(); } + void setNumRowsPerOutput(vector_size_t numRowsPerOutput) { + numRowsPerOutput_ = numRowsPerOutput; + } + protected: bool compareRowsWithKeys( const char* lhs, @@ -111,6 +115,9 @@ class WindowBuild { /// Number of input rows. vector_size_t numRows_ = 0; + + // The maximum number of rows that can fit into an output block. + vector_size_t numRowsPerOutput_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/WindowFunction.cpp b/velox/exec/WindowFunction.cpp index b093024a1dbe..b3419c3ba380 100644 --- a/velox/exec/WindowFunction.cpp +++ b/velox/exec/WindowFunction.cpp @@ -41,13 +41,23 @@ std::optional getWindowFunctionEntry( bool registerWindowFunction( const std::string& name, std::vector signatures, - WindowFunctionFactory factory) { + WindowFunctionFactory factory, + WindowFunctionMetadata metadata) { auto sanitizedName = sanitizeName(name); windowFunctions()[sanitizedName] = { - std::move(signatures), std::move(factory)}; + std::move(signatures), std::move(factory), std::move(metadata)}; return true; } +std::optional getWindowFunctionMetadata( + const std::string& name) { + auto sanitizedName = sanitizeName(name); + if (auto func = getWindowFunctionEntry(sanitizedName)) { + return func.value()->metadata; + } + return std::nullopt; +} + std::optional> getWindowFunctionSignatures( const std::string& name) { auto sanitizedName = sanitizeName(name); diff --git a/velox/exec/WindowFunction.h b/velox/exec/WindowFunction.h index ee0ef26869c1..8e1f91e2cb75 100644 --- a/velox/exec/WindowFunction.h +++ b/velox/exec/WindowFunction.h @@ -31,6 +31,24 @@ struct WindowFunctionArg { std::optional index; }; +/// The ProcessedUnit for calculating the window function. +enum class ProcessedUnit { + // Calculation may start only after all rows within a partitions are + // available. + kPartition, + // Calculation may being as soon as rows are available within a single + // partition, without waiting for all data in the partition to be ready + kRows, +}; + +/// Indicates whether the function is for an aggregate used as a window +/// function. It also specifies whether the ProcessedUnit of Window function is +/// by partition or by rows. +struct WindowFunctionMetadata { + ProcessedUnit processedUnit; + bool isAggregateWindow; +}; + class WindowFunction { public: explicit WindowFunction( @@ -149,7 +167,8 @@ using WindowFunctionFactory = std::function( bool registerWindowFunction( const std::string& name, std::vector signatures, - WindowFunctionFactory factory); + WindowFunctionFactory factory, + WindowFunctionMetadata metadata = {ProcessedUnit::kPartition, false}); /// Returns signatures of the window function with the specified name. /// Returns empty std::optional if function with that name is not found. @@ -159,8 +178,13 @@ std::optional> getWindowFunctionSignatures( struct WindowFunctionEntry { std::vector signatures; WindowFunctionFactory factory; + WindowFunctionMetadata metadata; }; +/// Returns std::nullopt if the function doesn't exist in the WindowFunctionMap. +std::optional getWindowFunctionMetadata( + const std::string& name); + using WindowFunctionMap = std::unordered_map; /// Returns a map of all window function names to their registrations. diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 2783b6b7b2f3..ddf0c40fa12b 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -29,6 +29,45 @@ WindowPartition::WindowPartition( for (int i = 0; i < inputMapping_.size(); i++) { columns_.emplace_back(data_->columnAt(inputMapping_[i])); } + + complete_ = true; +} + +WindowPartition::WindowPartition( + RowContainer* data, + const std::vector& inputMapping, + const std::vector>& sortKeyInfo) + : data_(data), inputMapping_(inputMapping), sortKeyInfo_(sortKeyInfo) { + for (int i = 0; i < inputMapping_.size(); i++) { + columns_.emplace_back(data_->columnAt(inputMapping_[i])); + } + + rows_.clear(); + partition_ = folly::Range(rows_.data(), rows_.size()); + complete_ = false; + partial_ = true; +} + +void WindowPartition::addRows(const std::vector& rows) { + rows_.insert(rows_.end(), rows.begin(), rows.end()); + partition_ = folly::Range(rows_.data(), rows_.size()); +} + +void WindowPartition::clearOutputRows(vector_size_t numRows) { + VELOX_CHECK(partial_, "Current WindowPartition should be partial."); + if (!complete_ || (complete_ && rows_.size() >= numRows)) { + if (complete_ && rows_.size() == 1 && numRows == 1) { + // Directly delete the last row if only one row in current partition. + data_->eraseRows(folly::Range(rows_.data(), numRows)); + rows_.erase(rows_.begin(), rows_.begin() + numRows); + } else { + data_->eraseRows(folly::Range(rows_.data(), numRows - 1)); + rows_.erase(rows_.begin(), rows_.begin() + numRows - 1); + } + + partition_ = folly::Range(rows_.data(), rows_.size()); + startRow_ += numRows; + } } void WindowPartition::extractColumn( @@ -51,7 +90,7 @@ void WindowPartition::extractColumn( vector_size_t resultOffset, const VectorPtr& result) const { RowContainer::extractColumn( - partition_.data() + partitionOffset, + partition_.data() + partitionOffset - startRow(), numRows, columns_[columnIndex], resultOffset, @@ -130,42 +169,78 @@ bool WindowPartition::compareRowsWithSortKeys(const char* lhs, const char* rhs) return false; } +vector_size_t WindowPartition::findPeerGroupEndIndex( + vector_size_t currentStart, + vector_size_t lastPartitionRow, + std::function peerCompare) { + auto peerEnd = currentStart; + while (peerEnd <= lastPartitionRow) { + if (peerCompare( + partition_[currentStart - startRow()], + partition_[peerEnd - startRow()])) { + break; + } + peerEnd++; + } + return peerEnd; +} + std::pair WindowPartition::computePeerBuffers( vector_size_t start, vector_size_t end, vector_size_t prevPeerStart, vector_size_t prevPeerEnd, vector_size_t* rawPeerStarts, - vector_size_t* rawPeerEnds) const { - const auto peerCompare = [&](const char* lhs, const char* rhs) -> bool { + vector_size_t* rawPeerEnds) { + auto peerCompare = [&](const char* lhs, const char* rhs) -> bool { return compareRowsWithSortKeys(lhs, rhs); }; - VELOX_CHECK_LE(end, numRows()); + VELOX_CHECK_LE(end, numRows() + startRow()); - const auto lastPartitionRow = numRows() - 1; + auto lastPartitionRow = numRows() + startRow() - 1; auto peerStart = prevPeerStart; auto peerEnd = prevPeerEnd; - for (auto i = start, j = 0; i < end; ++i, ++j) { - // When traversing input partition rows, the peers are the rows with the - // same values for the ORDER BY clause. These rows are equal in some ways - // and affect the results of ranking functions. This logic exploits the fact - // that all rows between the peerStart and peerEnd have the same values for - // rawPeerStarts and rawPeerEnds. So we can compute them just once and reuse - // across the rows in that peer interval. Note: peerStart and peerEnd can be - // maintained across getOutput calls. Hence, they are returned to the - // caller. + + auto nextStart = start; + + if (partial_ && start > 0) { + lastPartitionRow = end - 1; + auto peerGroup = peerCompare(partition_[0], partition_[1]); + + // The first row is the last row in previous batch. So Delete it after + // compare. + data_->eraseRows(folly::Range(rows_.data(), 1)); + rows_.erase(rows_.begin(), rows_.begin() + 1); + partition_ = folly::Range(rows_.data(), rows_.size()); + + if (!peerGroup) { + peerEnd = findPeerGroupEndIndex(start, lastPartitionRow, peerCompare); + + for (auto j = 0; j < (peerEnd - start); j++) { + rawPeerStarts[j] = peerStart; + rawPeerEnds[j] = peerEnd - 1; + } + + nextStart = peerEnd; + } + } + + for (auto i = nextStart, j = (nextStart - start); i < end; i++, j++) { + // When traversing input partition rows, the peers are the rows + // with the same values for the ORDER BY clause. These rows + // are equal in some ways and affect the results of ranking functions. + // This logic exploits the fact that all rows between the peerStart + // and peerEnd have the same values for rawPeerStarts and rawPeerEnds. + // So we can compute them just once and reuse across the rows in that peer + // interval. Note: peerStart and peerEnd can be maintained across + // getOutput calls. Hence, they are returned to the caller. + if (i == 0 || i >= peerEnd) { // Compute peerStart and peerEnd rows for the first row of the partition // or when past the previous peerGroup. peerStart = i; - peerEnd = i; - while (peerEnd <= lastPartitionRow) { - if (peerCompare(partition_[peerStart], partition_[peerEnd])) { - break; - } - ++peerEnd; - } + peerEnd = findPeerGroupEndIndex(peerStart, lastPartitionRow, peerCompare); } rawPeerStarts[j] = peerStart; diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 7073af3a4238..b9714c2334a9 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -20,9 +20,15 @@ /// Simple WindowPartition that builds over the RowContainer used for storing /// the input rows in the Window Operator. This works completely in-memory. +/// WindowPartition supports partial window partitioning to facilitate +/// RowsStreamingWindowBuild, which means that subsequent calculations within +/// the WindowPartition do not need to wait until the current partition is fully +/// ready before commencing. Calculations can begin as soon as a portion of the +/// rows are ready. /// TODO: This implementation will be revised for Spill to disk semantics. namespace facebook::velox::exec { + class WindowPartition { public: /// The WindowPartition is used by the Window operator and WindowFunction @@ -42,11 +48,45 @@ class WindowPartition { const std::vector>& sortKeyInfo); + /// The WindowPartition is used for partial partition when the input data will + /// be a subset of the entire partition. + WindowPartition( + RowContainer* data, + const std::vector& inputMapping, + const std::vector>& + sortKeyInfo); + + /// Adds remaining input rows when building the partial WindowPartition. + void addRows(const std::vector& rows); + + /// Clear the processed rows fow partial WindowPartition. + void clearOutputRows(vector_size_t numRows); + /// Returns the number of rows in the current WindowPartition. vector_size_t numRows() const { return partition_.size(); } + /// Returns the number of rows that will be processed. + vector_size_t numRowsForProcessing() const { + if (startRow_ > 0) { + return partition_.size() - 1; + } + return partition_.size(); + } + + bool isComplete() const { + return complete_; + } + + bool isPartial() const { + return partial_; + } + + void setComplete() { + complete_ = true; + } + /// Copies the values at 'columnIndex' into 'result' (starting at /// 'resultOffset') for the rows at positions in the 'rowNumbers' /// array from the partition input data. @@ -107,7 +147,7 @@ class WindowPartition { vector_size_t prevPeerStart, vector_size_t prevPeerEnd, vector_size_t* rawPeerStarts, - vector_size_t* rawPeerEnds) const; + vector_size_t* rawPeerEnds); /// Sets in 'rawFrameBounds' the frame boundary for the k range /// preceding/following frame. @@ -130,6 +170,11 @@ class WindowPartition { private: bool compareRowsWithSortKeys(const char* lhs, const char* rhs) const; + vector_size_t findPeerGroupEndIndex( + vector_size_t currentStart, + vector_size_t lastRow, + std::function peerCompare); + // Searches for 'currentRow[frameColumn]' in 'orderByColumn' of rows between // 'start' and 'end' in the partition. 'firstMatch' specifies if first or last // row is matched. @@ -162,16 +207,31 @@ class WindowPartition { const vector_size_t* rawPeerBounds, vector_size_t* rawFrameBounds) const; + // Returns the starting offset of the current partial window partition within + // the full partition. + vector_size_t startRow() const { + return startRow_; + } + // The RowContainer associated with the partition. // It is owned by the WindowBuild that creates the partition. RowContainer* data_; + // Holds input rows within the partial partition. + std::vector rows_; + // folly::Range is for the partition rows iterator provided by the // Window operator. The pointers are to rows from a RowContainer owned // by the operator. We can assume these are valid values for the lifetime // of WindowPartition. folly::Range partition_; + // Indicates that the partial window partitioning process has been completed. + bool complete_ = false; + + // Indicates partial window partition. + bool partial_ = false; + // Mapping from window input column -> index in data_. This is required // because the WindowBuild reorders data_ to place partition and sort keys // before other columns in data_. But the Window Operator and Function code @@ -189,5 +249,8 @@ class WindowPartition { // corresponding indexes of their input arguments into this vector. // They will request for column vector values at the respective index. std::vector columns_; + + // The partition offset of the first row in rows_. + vector_size_t startRow_ = 0; }; } // namespace facebook::velox::exec diff --git a/velox/exec/fuzzer/AggregationFuzzer.cpp b/velox/exec/fuzzer/AggregationFuzzer.cpp index b6a0b1d7ffd5..9f85067f9af8 100644 --- a/velox/exec/fuzzer/AggregationFuzzer.cpp +++ b/velox/exec/fuzzer/AggregationFuzzer.cpp @@ -168,6 +168,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { false /*injectSpill*/, false /*abandonPartial*/, customVerification, + false, customVerifiers, expected, maxDrivers); @@ -179,6 +180,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { true /*injectSpill*/, false /*abandonPartial*/, customVerification, + false, customVerifiers, expected, maxDrivers); @@ -192,6 +194,7 @@ class AggregationFuzzer : public AggregationFuzzerBase { false /*injectSpill*/, true /*abandonPartial*/, customVerification, + false, customVerifiers, expected, maxDrivers); diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index e912d9976c51..942738ee3842 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -16,10 +16,12 @@ #include "velox/exec/fuzzer/AggregationFuzzerBase.h" #include +#include #include "velox/common/base/Fs.h" #include "velox/common/base/VeloxException.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/fuzzer/DuckQueryRunner.h" #include "velox/exec/fuzzer/PrestoQueryRunner.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -423,6 +425,7 @@ velox::fuzzer::ResultOrError AggregationFuzzerBase::execute( const std::vector& splits, bool injectSpill, bool abandonPartial, + bool supportRowsStreaming, int32_t maxDrivers) { LOG(INFO) << "Executing query plan: " << std::endl << plan->toString(true, true); @@ -452,6 +455,14 @@ velox::fuzzer::ResultOrError AggregationFuzzerBase::execute( .config(core::QueryConfig::kMaxExtendedPartialAggregationMemory, "0"); } + if (supportRowsStreaming) { + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Window::supportRowsStreaming", + std::function([&](bool* supportRowsStreamingWindow) { + ASSERT_EQ(*supportRowsStreamingWindow, true); + })); + } + if (!splits.empty()) { builder.splits(splits); } @@ -491,6 +502,7 @@ void AggregationFuzzerBase::testPlan( bool injectSpill, bool abandonPartial, bool customVerification, + bool supportRowsStreaming, const std::vector>& customVerifiers, const velox::fuzzer::ResultOrError& expected, int32_t maxDrivers) { @@ -499,6 +511,7 @@ void AggregationFuzzerBase::testPlan( planWithSplits.splits, injectSpill, abandonPartial, + supportRowsStreaming, maxDrivers); compare(actual, customVerification, customVerifiers, expected); } diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index 53a29d96f254..824646c3eac8 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -210,6 +210,7 @@ class AggregationFuzzerBase { const std::vector& splits = {}, bool injectSpill = false, bool abandonPartial = false, + bool supportRowsStreaming = false, int32_t maxDrivers = 2); void compare( @@ -234,6 +235,7 @@ class AggregationFuzzerBase { bool injectSpill, bool abandonPartial, bool customVerification, + bool supportRowsStreaming, const std::vector>& customVerifiers, const velox::fuzzer::ResultOrError& expected, int32_t maxDrivers = 2); diff --git a/velox/exec/fuzzer/WindowFuzzer.cpp b/velox/exec/fuzzer/WindowFuzzer.cpp index 3ab911dff5a6..7f6e17c07d2c 100644 --- a/velox/exec/fuzzer/WindowFuzzer.cpp +++ b/velox/exec/fuzzer/WindowFuzzer.cpp @@ -269,7 +269,8 @@ void WindowFuzzer::go() { input, customVerification, customVerifier, - FLAGS_enable_window_reference_verification); + FLAGS_enable_window_reference_verification, + false); if (failed) { signatureWithStats.second.numFailed++; } @@ -304,6 +305,7 @@ void WindowFuzzer::testAlternativePlans( const std::string& functionCall, const std::vector& input, bool customVerification, + bool supportRowsStreaming, const std::shared_ptr& customVerifier, const velox::fuzzer::ResultOrError& expected) { std::vector plans; @@ -327,6 +329,8 @@ void WindowFuzzer::testAlternativePlans( {fmt::format("{} over ({})", functionCall, frame)}) .planNode(), {}}); + } else { + supportRowsStreaming = false; } // With TableScan. @@ -364,7 +368,13 @@ void WindowFuzzer::testAlternativePlans( for (const auto& plan : plans) { testPlan( - plan, false, false, customVerification, {customVerifier}, expected); + plan, + false, + false, + customVerification, + supportRowsStreaming, + {customVerifier}, + expected); } } @@ -396,7 +406,8 @@ bool WindowFuzzer::verifyWindow( const std::vector& input, bool customVerification, const std::shared_ptr& customVerifier, - bool enableWindowVerification) { + bool enableWindowVerification, + bool supportRowsStreaming) { SCOPE_EXIT { if (customVerifier) { customVerifier->reset(); @@ -464,6 +475,7 @@ bool WindowFuzzer::verifyWindow( functionCall, input, customVerification, + supportRowsStreaming, customVerifier, resultOrError); diff --git a/velox/exec/fuzzer/WindowFuzzer.h b/velox/exec/fuzzer/WindowFuzzer.h index 449034114958..0987171471b7 100644 --- a/velox/exec/fuzzer/WindowFuzzer.h +++ b/velox/exec/fuzzer/WindowFuzzer.h @@ -109,7 +109,8 @@ class WindowFuzzer : public AggregationFuzzerBase { const std::vector& input, bool customVerification, const std::shared_ptr& customVerifier, - bool enableWindowVerification); + bool enableWindowVerification, + bool supportRowsStreaming); void testAlternativePlans( const std::vector& partitionKeys, @@ -118,6 +119,7 @@ class WindowFuzzer : public AggregationFuzzerBase { const std::string& functionCall, const std::vector& input, bool customVerification, + bool supportRowsStreaming, const std::shared_ptr& customVerifier, const velox::fuzzer::ResultOrError& expected); diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index febdcd743d30..3ff26d39f90d 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -16,6 +16,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -79,6 +80,136 @@ TEST_F(WindowTest, spill) { ASSERT_GT(stats.spilledPartitions, 0); } +TEST_F(WindowTest, rankWithEqualValue) { + auto data = makeRowVector( + {"c1"}, + {makeFlatVector(std::vector{1, 1, 1, 1, 1, 2, 2})}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (order by c1 rows unbounded preceding)"}; + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"c1"}, false) + .streamingWindow(kClauses) + .planNode(); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Window::supportRowsStreaming", + std::function([&](bool* supportRowsStreamingWindow) { + ASSERT_EQ(*supportRowsStreamingWindow, true); + })); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .assertResults( + "SELECT *, rank() over (order by c1 rows unbounded preceding) FROM tmp"); +} + +TEST_F(WindowTest, rowStreamingWindowBuild) { + const vector_size_t size = 1'00; + + auto data = makeRowVector( + {makeFlatVector(size, [](auto row) { return row % 5; }), + makeFlatVector(size, [](auto row) { return row % 50; }), + makeFlatVector( + size, [](auto row) { return row % 3 + 1; }, nullEvery(5)), + makeFlatVector(size, [](auto row) { return row % 40; }), + makeFlatVector(size, [](auto row) { return row; })}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (partition by c0, c2 order by c1, c3)", + "dense_rank() over (partition by c0, c2 order by c1, c3)", + "row_number() over (partition by c0, c2 order by c1, c3)", + "sum(c4) over (partition by c0, c2 order by c1, c3)"}; + + auto plan = PlanBuilder() + .values({split(data, 10)}) + .orderBy({"c0", "c2", "c1", "c3"}, false) + .streamingWindow(kClauses) + .planNode(); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Window::supportRowsStreaming", + std::function([&](bool* supportRowsStreamingWindow) { + ASSERT_EQ(*supportRowsStreamingWindow, true); + })); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .assertResults( + "SELECT *, rank() over (partition by c0, c2 order by c1, c3), dense_rank() over (partition by c0, c2 order by c1, c3), row_number() over (partition by c0, c2 order by c1, c3), sum(c4) over (partition by c0, c2 order by c1, c3) FROM tmp"); +} + +TEST_F(WindowTest, aggregationWithNonDefaultFrame) { + const vector_size_t size = 1'00; + + auto data = makeRowVector( + {makeFlatVector(size, [](auto row) { return row % 5; }), + makeFlatVector(size, [](auto row) { return row % 50; }), + makeFlatVector( + size, [](auto row) { return row % 3 + 1; }, nullEvery(5)), + makeFlatVector(size, [](auto row) { return row % 40; }), + makeFlatVector(size, [](auto row) { return row; })}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "sum(c4) over (partition by c0, c2 order by c1, c3 range between unbounded preceding and unbounded following)"}; + + auto plan = PlanBuilder() + .values({split(data, 10)}) + .orderBy({"c0", "c2", "c1", "c3"}, false) + .streamingWindow(kClauses) + .planNode(); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Window::supportRowsStreaming", + std::function([&](bool* supportRowsStreamingWindow) { + ASSERT_EQ(*supportRowsStreamingWindow, false); + })); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .assertResults( + "SELECT *, sum(c4) over (partition by c0, c2 order by c1, c3 range between unbounded preceding and unbounded following) FROM tmp"); +} + +TEST_F(WindowTest, nonRowsStreamingWindow) { + auto data = makeRowVector( + {"c1"}, + {makeFlatVector(std::vector{1, 1, 1, 1, 1, 2, 2})}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "first_value(c1) over (order by c1 rows unbounded preceding)", + "nth_value(c1, 1) over (order by c1 rows unbounded preceding)"}; + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"c1"}, false) + .streamingWindow(kClauses) + .planNode(); + + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Window::supportRowsStreaming", + std::function([&](bool* supportRowsStreamingWindow) { + ASSERT_EQ(*supportRowsStreamingWindow, false); + })); + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .assertResults( + "SELECT *, first_value(c1) over (order by c1 rows unbounded preceding), nth_value(c1, 1) over (order by c1 rows unbounded preceding) FROM tmp"); +} + TEST_F(WindowTest, missingFunctionSignature) { auto input = {makeRowVector({ makeFlatVector({1, 2, 3}), diff --git a/velox/functions/lib/window/Rank.cpp b/velox/functions/lib/window/Rank.cpp index 646557a2e30c..3962788c7025 100644 --- a/velox/functions/lib/window/Rank.cpp +++ b/velox/functions/lib/window/Rank.cpp @@ -97,9 +97,7 @@ void registerRankInternal( exec::FunctionSignatureBuilder().returnType(returnType).build(), }; - exec::registerWindowFunction( - name, - std::move(signatures), + auto windowFunctionFactory = [name]( const std::vector& /*args*/, const TypePtr& resultType, @@ -107,9 +105,20 @@ void registerRankInternal( velox::memory::MemoryPool* /*pool*/, HashStringAllocator* /*stringAllocator*/, const core::QueryConfig& /*queryConfig*/) - -> std::unique_ptr { - return std::make_unique>(resultType); - }); + -> std::unique_ptr { + return std::make_unique>(resultType); + }; + + if constexpr (TRank == RankType::kRank || TRank == RankType::kDenseRank) { + exec::registerWindowFunction( + name, + std::move(signatures), + std::move(windowFunctionFactory), + {exec::ProcessedUnit::kRows, false}); + } else { + exec::registerWindowFunction( + name, std::move(signatures), std::move(windowFunctionFactory)); + } } void registerRankBigint(const std::string& name) { diff --git a/velox/functions/lib/window/RowNumber.cpp b/velox/functions/lib/window/RowNumber.cpp index 16b7feb0a543..567451c485da 100644 --- a/velox/functions/lib/window/RowNumber.cpp +++ b/velox/functions/lib/window/RowNumber.cpp @@ -84,7 +84,8 @@ void registerRowNumber(const std::string& name, TypeKind resultTypeKind) { const core::QueryConfig& /*queryConfig*/) -> std::unique_ptr { return std::make_unique(resultType); - }); + }, + {exec::ProcessedUnit::kRows, false}); } void registerRowNumberInteger(const std::string& name) {