Skip to content

Commit

Permalink
Parallize Spill serialization and IO
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Mar 27, 2024
1 parent 3aa020d commit f1cdacf
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 5 deletions.
7 changes: 7 additions & 0 deletions velox/exec/Spill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ SpillState::SpillState(
uint64_t targetFileSize,
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
folly::Executor* executor,
memory::MemoryPool* pool,
folly::Synchronized<common::SpillStats>* stats,
const std::string& fileCreateConfig)
Expand All @@ -87,6 +88,7 @@ SpillState::SpillState(
writeBufferSize_(writeBufferSize),
compressionKind_(compressionKind),
fileCreateConfig_(fileCreateConfig),
executor_(executor),
pool_(pool),
stats_(stats),
partitionWriters_(maxPartitions_) {}
Expand All @@ -109,6 +111,10 @@ void SpillState::updateSpilledInputBytes(uint64_t bytes) {
uint64_t SpillState::appendToPartition(
uint32_t partition,
const RowVectorPtr& rows) {
if (!isPartitionSpilled(partition)) {
VELOX_CHECK(
isPartitionSpilled(partition), "Partition {} is not spilled", partition);
}
VELOX_CHECK(
isPartitionSpilled(partition), "Partition {} is not spilled", partition);

Expand All @@ -131,6 +137,7 @@ uint64_t SpillState::appendToPartition(
writeBufferSize_,
fileCreateConfig_,
updateAndCheckSpillLimitCb_,
executor_,
pool_,
stats_);
}
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ class SpillState {
uint64_t targetFileSize,
uint64_t writeBufferSize,
common::CompressionKind compressionKind,
folly::Executor* executor,
memory::MemoryPool* pool,
folly::Synchronized<common::SpillStats>* stats,
const std::string& fileCreateConfig = {});
Expand Down Expand Up @@ -427,6 +428,8 @@ class SpillState {
const uint64_t writeBufferSize_;
const common::CompressionKind compressionKind_;
const std::string fileCreateConfig_;

folly::Executor* const executor_;
memory::MemoryPool* const pool_;
folly::Synchronized<common::SpillStats>* const stats_;

Expand Down
35 changes: 30 additions & 5 deletions velox/exec/SpillFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/exec/SpillFile.h"
#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/RuntimeMetrics.h"
#include "velox/common/file/FileSystems.h"

Expand Down Expand Up @@ -88,6 +89,7 @@ SpillWriter::SpillWriter(
uint64_t writeBufferSize,
const std::string& fileCreateConfig,
common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb,
folly::Executor* executor,
memory::MemoryPool* pool,
folly::Synchronized<common::SpillStats>* stats)
: type_(type),
Expand All @@ -99,6 +101,7 @@ SpillWriter::SpillWriter(
writeBufferSize_(writeBufferSize),
fileCreateConfig_(fileCreateConfig),
updateAndCheckSpillLimitCb_(updateAndCheckSpillLimitCb),
executor_(executor),
pool_(pool),
stats_(stats) {
// NOTE: if the associated spilling operator has specified the sort
Expand Down Expand Up @@ -141,22 +144,32 @@ size_t SpillWriter::numFinishedFiles() const {
return finishedFiles_.size();
}

void SpillWriter::asyncFlush() {
VELOX_CHECK_NULL(flushingBatch_);
VELOX_CHECK_NOT_NULL(batch_);
flushingBatch_ = std::move(batch_);
asyncFlushJob_ = std::make_shared<AsyncSource<uint64_t>>(
[this]() { return std::make_unique<uint64_t>(flush()); });
executor_->add(
[asyncFlushJob = asyncFlushJob_]() { asyncFlushJob->prepare(); });
}

uint64_t SpillWriter::flush() {
if (batch_ == nullptr) {
if (flushingBatch_ == nullptr) {
return 0;
}

auto* file = ensureFile();
VELOX_CHECK_NOT_NULL(file);

IOBufOutputStream out(
*pool_, nullptr, std::max<int64_t>(64 * 1024, batch_->size()));
*pool_, nullptr, std::max<int64_t>(64 * 1024, flushingBatch_->size()));
uint64_t flushTimeUs{0};
{
MicrosecondTimer timer(&flushTimeUs);
batch_->flush(&out);
flushingBatch_->flush(&out);
}
batch_.reset();
flushingBatch_.reset();

uint64_t writeTimeUs{0};
uint64_t writtenBytes{0};
Expand Down Expand Up @@ -193,7 +206,14 @@ uint64_t SpillWriter::write(
if (batch_->size() < writeBufferSize_) {
return 0;
}
return flush();

if (asyncFlushJob_ == nullptr) {
asyncFlush();
return 0;
}
auto flushedBytes = asyncFlushJob_->move();
asyncFlush();
return *flushedBytes;
}

void SpillWriter::updateAppendStats(
Expand Down Expand Up @@ -227,6 +247,11 @@ void SpillWriter::updateSpilledFileStats(uint64_t fileSize) {

void SpillWriter::finishFile() {
checkNotFinished();
if (asyncFlushJob_ != nullptr) {
asyncFlushJob_->move();
asyncFlushJob_.reset();
}
flushingBatch_ = std::move(batch_);
flush();
closeFile();
VELOX_CHECK_NULL(currentFile_);
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/SpillFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <folly/container/F14Set.h>

#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/SpillConfig.h"
#include "velox/common/base/SpillStats.h"
#include "velox/common/compression/Compression.h"
Expand Down Expand Up @@ -119,6 +120,7 @@ class SpillWriter {
uint64_t writeBufferSize,
const std::string& fileCreateConfig,
common::UpdateAndCheckSpillLimitCB& updateAndCheckSpillLimitCb,
folly::Executor* executor,
memory::MemoryPool* pool,
folly::Synchronized<common::SpillStats>* stats);

Expand Down Expand Up @@ -161,6 +163,8 @@ class SpillWriter {
// Closes the current open spill file pointed by 'currentFile_'.
void closeFile();

void asyncFlush();

// Writes data from 'batch_' to the current output file. Returns the actual
// written size.
uint64_t flush();
Expand Down Expand Up @@ -189,14 +193,17 @@ class SpillWriter {
// Updates the aggregated spill bytes of this query, and throws if exceeds
// the max spill bytes limit.
common::UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb_;
folly::Executor* const executor_;
memory::MemoryPool* const pool_;
folly::Synchronized<common::SpillStats>* const stats_;

bool finished_{false};
uint32_t nextFileId_{0};
std::unique_ptr<VectorStreamGroup> batch_;
std::unique_ptr<VectorStreamGroup> flushingBatch_;
std::unique_ptr<SpillWriteFile> currentFile_;
SpillFiles finishedFiles_;
std::shared_ptr<AsyncSource<uint64_t>> asyncFlushJob_;
};

/// Input stream backed by spill file.
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ Spiller::Spiller(
targetFileSize,
writeBufferSize,
compressionKind,
executor_,
memory::spillMemoryPool(),
spillStats,
fileCreateConfig) {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/tests/SpillTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class SpillTest : public ::testing::TestWithParam<common::CompressionKind>,
targetFileSize,
writeBufferSize,
compressionKind_,
executor_.get(),
pool(),
&stats_);
ASSERT_EQ(targetFileSize, state_->targetFileSize());
Expand Down Expand Up @@ -415,6 +416,8 @@ class SpillTest : public ::testing::TestWithParam<common::CompressionKind>,
ASSERT_EQ(runtimeStats_["spillFileSize"].count, spilledFiles.size());
}

std::shared_ptr<folly::CPUThreadPoolExecutor> executor_{
std::make_shared<folly::CPUThreadPoolExecutor>(4)};
folly::Random::DefaultGenerator rng_;
std::shared_ptr<TempDirectoryPath> tempDir_;
memory::MemoryAllocator* allocator_;
Expand Down Expand Up @@ -473,6 +476,7 @@ TEST_P(SpillTest, spillTimestamp) {
1024,
0,
compressionKind_,
executor_.get(),
pool(),
&stats_);
int partitionIndex = 0;
Expand Down

0 comments on commit f1cdacf

Please sign in to comment.