diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index c6aa380ccdbb5..086603b578e69 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -74,6 +74,7 @@ SpillState::SpillState( uint64_t targetFileSize, uint64_t writeBufferSize, common::CompressionKind compressionKind, + folly::Executor* executor, memory::MemoryPool* pool, folly::Synchronized* stats, const std::string& fileCreateConfig) @@ -87,6 +88,7 @@ SpillState::SpillState( writeBufferSize_(writeBufferSize), compressionKind_(compressionKind), fileCreateConfig_(fileCreateConfig), + executor_(executor), pool_(pool), stats_(stats), partitionWriters_(maxPartitions_) {} @@ -109,6 +111,10 @@ void SpillState::updateSpilledInputBytes(uint64_t bytes) { uint64_t SpillState::appendToPartition( uint32_t partition, const RowVectorPtr& rows) { + if (!isPartitionSpilled(partition)) { + VELOX_CHECK( + isPartitionSpilled(partition), "Partition {} is not spilled", partition); + } VELOX_CHECK( isPartitionSpilled(partition), "Partition {} is not spilled", partition); @@ -131,6 +137,7 @@ uint64_t SpillState::appendToPartition( writeBufferSize_, fileCreateConfig_, updateAndCheckSpillLimitCb_, + executor_, pool_, stats_); } diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index c350df9839dfb..948f6483e16d3 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -330,6 +330,7 @@ class SpillState { uint64_t targetFileSize, uint64_t writeBufferSize, common::CompressionKind compressionKind, + folly::Executor* executor, memory::MemoryPool* pool, folly::Synchronized* stats, const std::string& fileCreateConfig = {}); @@ -427,6 +428,8 @@ class SpillState { const uint64_t writeBufferSize_; const common::CompressionKind compressionKind_; const std::string fileCreateConfig_; + + folly::Executor* const executor_; memory::MemoryPool* const pool_; folly::Synchronized* const stats_; diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index 46ae2cc9b44f6..df534ca764178 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -15,6 +15,7 @@ */ #include "velox/exec/SpillFile.h" +#include "velox/common/base/AsyncSource.h" #include "velox/common/base/RuntimeMetrics.h" #include "velox/common/file/FileSystems.h" @@ -88,6 +89,7 @@ SpillWriter::SpillWriter( uint64_t writeBufferSize, const std::string& fileCreateConfig, common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb, + folly::Executor* executor, memory::MemoryPool* pool, folly::Synchronized* stats) : type_(type), @@ -99,6 +101,7 @@ SpillWriter::SpillWriter( writeBufferSize_(writeBufferSize), fileCreateConfig_(fileCreateConfig), updateAndCheckSpillLimitCb_(updateAndCheckSpillLimitCb), + executor_(executor), pool_(pool), stats_(stats) { // NOTE: if the associated spilling operator has specified the sort @@ -141,8 +144,18 @@ size_t SpillWriter::numFinishedFiles() const { return finishedFiles_.size(); } +void SpillWriter::asyncFlush() { + VELOX_CHECK_NULL(flushingBatch_); + VELOX_CHECK_NOT_NULL(batch_); + flushingBatch_ = std::move(batch_); + asyncFlushJob_ = std::make_shared>( + [this]() { return std::make_unique(flush()); }); + executor_->add( + [asyncFlushJob = asyncFlushJob_]() { asyncFlushJob->prepare(); }); +} + uint64_t SpillWriter::flush() { - if (batch_ == nullptr) { + if (flushingBatch_ == nullptr) { return 0; } @@ -150,13 +163,13 @@ uint64_t SpillWriter::flush() { VELOX_CHECK_NOT_NULL(file); IOBufOutputStream out( - *pool_, nullptr, std::max(64 * 1024, batch_->size())); + *pool_, nullptr, std::max(64 * 1024, flushingBatch_->size())); uint64_t flushTimeUs{0}; { MicrosecondTimer timer(&flushTimeUs); - batch_->flush(&out); + flushingBatch_->flush(&out); } - batch_.reset(); + flushingBatch_.reset(); uint64_t writeTimeUs{0}; uint64_t writtenBytes{0}; @@ -193,7 +206,14 @@ uint64_t SpillWriter::write( if (batch_->size() < writeBufferSize_) { return 0; } - return flush(); + + if (asyncFlushJob_ == nullptr) { + asyncFlush(); + return 0; + } + auto flushedBytes = asyncFlushJob_->move(); + asyncFlush(); + return *flushedBytes; } void SpillWriter::updateAppendStats( @@ -227,6 +247,11 @@ void SpillWriter::updateSpilledFileStats(uint64_t fileSize) { void SpillWriter::finishFile() { checkNotFinished(); + if (asyncFlushJob_ != nullptr) { + asyncFlushJob_->move(); + asyncFlushJob_.reset(); + } + flushingBatch_ = std::move(batch_); flush(); closeFile(); VELOX_CHECK_NULL(currentFile_); diff --git a/velox/exec/SpillFile.h b/velox/exec/SpillFile.h index eee5a1727119b..6f5b4530c6a5e 100644 --- a/velox/exec/SpillFile.h +++ b/velox/exec/SpillFile.h @@ -18,6 +18,7 @@ #include +#include "velox/common/base/AsyncSource.h" #include "velox/common/base/SpillConfig.h" #include "velox/common/base/SpillStats.h" #include "velox/common/compression/Compression.h" @@ -119,6 +120,7 @@ class SpillWriter { uint64_t writeBufferSize, const std::string& fileCreateConfig, common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb, + folly::Executor* executor, memory::MemoryPool* pool, folly::Synchronized* stats); @@ -161,6 +163,8 @@ class SpillWriter { // Closes the current open spill file pointed by 'currentFile_'. void closeFile(); + void asyncFlush(); + // Writes data from 'batch_' to the current output file. Returns the actual // written size. uint64_t flush(); @@ -189,14 +193,17 @@ class SpillWriter { // Updates the aggregated spill bytes of this query, and throws if exceeds // the max spill bytes limit. common::UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb_; + folly::Executor* const executor_; memory::MemoryPool* const pool_; folly::Synchronized* const stats_; bool finished_{false}; uint32_t nextFileId_{0}; std::unique_ptr batch_; + std::unique_ptr flushingBatch_; std::unique_ptr currentFile_; SpillFiles finishedFiles_; + std::shared_ptr> asyncFlushJob_; }; /// Input stream backed by spill file. diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index cd63c5545a2a9..4ad73801cbf07 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -224,6 +224,7 @@ Spiller::Spiller( targetFileSize, writeBufferSize, compressionKind, + executor_, memory::spillMemoryPool(), spillStats, fileCreateConfig) { diff --git a/velox/exec/tests/SpillTest.cpp b/velox/exec/tests/SpillTest.cpp index a6238db7611d1..ff84dbad013bd 100644 --- a/velox/exec/tests/SpillTest.cpp +++ b/velox/exec/tests/SpillTest.cpp @@ -166,6 +166,7 @@ class SpillTest : public ::testing::TestWithParam, targetFileSize, writeBufferSize, compressionKind_, + executor_.get(), pool(), &stats_); ASSERT_EQ(targetFileSize, state_->targetFileSize()); @@ -415,6 +416,8 @@ class SpillTest : public ::testing::TestWithParam, ASSERT_EQ(runtimeStats_["spillFileSize"].count, spilledFiles.size()); } + std::shared_ptr executor_{ + std::make_shared(4)}; folly::Random::DefaultGenerator rng_; std::shared_ptr tempDir_; memory::MemoryAllocator* allocator_; @@ -473,6 +476,7 @@ TEST_P(SpillTest, spillTimestamp) { 1024, 0, compressionKind_, + executor_.get(), pool(), &stats_); int partitionIndex = 0;