Skip to content

Commit

Permalink
Add early flushed bytes runtime metrics (facebookincubator#9378)
Browse files Browse the repository at this point in the history
Summary:
Add early flushed bytes in writer that is due to memory reclaiming.

Pull Request resolved: facebookincubator#9378

Reviewed By: xiaoxmeng

Differential Revision: D55786890

Pulled By: tanjialiang

fbshipit-source-id: f81dd83390032bd1b6e944bfee9ee56c0072a2ea
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Apr 6, 2024
1 parent e0ef5a2 commit a42079c
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 24 deletions.
5 changes: 5 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,5 +196,10 @@ void registerVeloxMetrics() {
// Tracks the number of times that we hit the max spill level limit.
DEFINE_METRIC(
kMetricMaxSpillLevelExceededCount, facebook::velox::StatType::COUNT);

// Tracks the total number of bytes in file writers that's pre-maturely
// flushed due to memory reclaiming.
DEFINE_METRIC(
kMetricFileWriterEarlyFlushedRawBytes, facebook::velox::StatType::SUM);
}
} // namespace facebook::velox
3 changes: 3 additions & 0 deletions velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,7 @@ constexpr folly::StringPiece kMetricSpillFlushTimeMs{

constexpr folly::StringPiece kMetricSpillWriteTimeMs{
"velox.spill_write_time_ms"};

constexpr folly::StringPiece kMetricFileWriterEarlyFlushedRawBytes{
"velox.file_writer_early_flushed_raw_bytes"};
} // namespace facebook::velox
25 changes: 19 additions & 6 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,15 @@ std::shared_ptr<memory::MemoryPool> HiveDataSink::createWriterPool(
fmt::format("{}.{}", connectorPool->name(), writerId.toString()));
}

void HiveDataSink::setMemoryReclaimers(HiveWriterInfo* writerInfo) {
void HiveDataSink::setMemoryReclaimers(
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats) {
auto* connectorPool = connectorQueryCtx_->connectorMemoryPool();
if (connectorPool->reclaimer() == nullptr) {
return;
}
writerInfo->writerPool->setReclaimer(
WriterReclaimer::create(this, writerInfo));
WriterReclaimer::create(this, writerInfo, ioStats));
writerInfo->sinkPool->setReclaimer(exec::MemoryReclaimer::create());
// NOTE: we set the memory reclaimer for sort pool when we construct the sort
// writer.
Expand Down Expand Up @@ -658,7 +660,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
std::move(writerPool),
std::move(sinkPool),
std::move(sortPool)));
setMemoryReclaimers(writerInfo_.back().get());
ioStats_.emplace_back(std::make_shared<io::IoStatistics>());
setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get());

dwio::common::WriterOptions options;
const auto* connectorSessionProperties =
Expand All @@ -681,7 +684,6 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
options.serdeParameters = std::map<std::string, std::string>(
insertTableHandle_->serdeParameters().begin(),
insertTableHandle_->serdeParameters().end());
ioStats_.emplace_back(std::make_shared<io::IoStatistics>());

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
Expand Down Expand Up @@ -932,9 +934,10 @@ LocationHandlePtr LocationHandle::create(const folly::dynamic& obj) {

std::unique_ptr<memory::MemoryReclaimer> HiveDataSink::WriterReclaimer::create(
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo) {
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats) {
return std::unique_ptr<memory::MemoryReclaimer>(
new HiveDataSink::WriterReclaimer(dataSink, writerInfo));
new HiveDataSink::WriterReclaimer(dataSink, writerInfo, ioStats));
}

bool HiveDataSink::WriterReclaimer::reclaimableBytes(
Expand Down Expand Up @@ -971,8 +974,18 @@ uint64_t HiveDataSink::WriterReclaimer::reclaim(

const uint64_t memoryUsageBeforeReclaim = pool->currentBytes();
const std::string memoryUsageTreeBeforeReclaim = pool->treeMemoryUsage();
const auto writtenBytesBeforeReclaim = ioStats_->rawBytesWritten();
const auto reclaimedBytes =
exec::MemoryReclaimer::reclaim(pool, targetBytes, maxWaitMs, stats);
const auto earlyFlushedRawBytes =
ioStats_->rawBytesWritten() - writtenBytesBeforeReclaim;
addThreadLocalRuntimeStat(
kEarlyFlushedRawBytes,
RuntimeCounter(earlyFlushedRawBytes, RuntimeCounter::Unit::kBytes));
if (earlyFlushedRawBytes > 0) {
RECORD_METRIC_VALUE(
kMetricFileWriterEarlyFlushedRawBytes, earlyFlushedRawBytes);
}
const uint64_t memoryUsageAfterReclaim = pool->currentBytes();
if (memoryUsageAfterReclaim > memoryUsageBeforeReclaim) {
VELOX_FAIL(
Expand Down
20 changes: 16 additions & 4 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ struct HiveWriterIdEq {

class HiveDataSink : public DataSink {
public:
/// The list of runtime stats reported by hive data sink
static constexpr const char* kEarlyFlushedRawBytes = "earlyFlushedRawBytes";

HiveDataSink(
RowTypePtr inputType,
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
Expand Down Expand Up @@ -448,7 +451,8 @@ class HiveDataSink : public DataSink {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo);
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats);

bool reclaimableBytes(
const memory::MemoryPool& pool,
Expand All @@ -461,16 +465,22 @@ class HiveDataSink : public DataSink {
memory::MemoryReclaimer::Stats& stats) override;

private:
WriterReclaimer(HiveDataSink* dataSink, HiveWriterInfo* writerInfo)
WriterReclaimer(
HiveDataSink* dataSink,
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats)
: exec::MemoryReclaimer(),
dataSink_(dataSink),
writerInfo_(writerInfo) {
writerInfo_(writerInfo),
ioStats_(ioStats) {
VELOX_CHECK_NOT_NULL(dataSink_);
VELOX_CHECK_NOT_NULL(writerInfo_);
VELOX_CHECK_NOT_NULL(ioStats_);
}

HiveDataSink* const dataSink_;
HiveWriterInfo* const writerInfo_;
io::IoStatistics* const ioStats_;
};

FOLLY_ALWAYS_INLINE bool sortWrite() const {
Expand All @@ -494,7 +504,9 @@ class HiveDataSink : public DataSink {
std::shared_ptr<memory::MemoryPool> createWriterPool(
const HiveWriterId& writerId);

void setMemoryReclaimers(HiveWriterInfo* writerInfo);
void setMemoryReclaimers(
HiveWriterInfo* writerInfo,
io::IoStatistics* ioStats);

// Compute the partition id and bucket id for each row in 'input'.
void computePartitionAndBucketIds(const RowVectorPtr& input);
Expand Down
3 changes: 3 additions & 0 deletions velox/docs/monitoring/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ Spilling
- The distribution of the amount of time spent on writing spilled rows to
disk in range of [0, 600s] with 20 buckets. It is configured to report the
latency at P50, P90, P99, and P100 percentiles.
* - file_writer_early_flushed_raw_bytes
- Sum
- Number of bytes pre-maturely flushed from file writers because of memory reclaiming.

Hive Connector
--------------
Expand Down
15 changes: 15 additions & 0 deletions velox/docs/monitoring/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ These stats are reported only by HashBuild and HashAggregation operators.
- Time spent on building the hash table from rows collected by all the
hash build operators. This stat is only reported by the HashBuild operator.

TableWriter
-----------
These stats are reported only by TableWriter operator

.. list-table::
:widths: 50 25 50
:header-rows: 1

* - Stats
- Unit
- Description
* - earlyFlushedRawBytes
- bytes
- Number of bytes pre-maturely flushed from file writers because of memory reclaiming.

Spilling
--------
These stats are reported by operators that support spilling.
Expand Down
55 changes: 41 additions & 14 deletions velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3431,30 +3431,56 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) {

auto spillDirectory = exec::test::TempDirectoryPath::create();
auto outputDirectory = TempDirectoryPath::create();
core::PlanNodeId tableWriteNodeId;
auto writerPlan =
PlanBuilder()
.values(vectors)
.tableWrite(outputDirectory->path)
.capturePlanNodeId(tableWriteNodeId)
.project({TableWriteTraits::rowCountColumnName()})
.singleAggregation(
{},
{fmt::format(
"sum({})", TableWriteTraits::rowCountColumnName())})
.planNode();

AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(queryCtx)
.maxDrivers(1)
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, writerSpillEnabled)
.config(core::QueryConfig::kWriterSpillEnabled, writerSpillEnabled)
// Set 0 file writer flush threshold to always trigger flush in test.
.config(core::QueryConfig::kWriterFlushThresholdBytes, 0)
.plan(std::move(writerPlan))
.assertResults(fmt::format("SELECT {}", numRows));

ASSERT_EQ(arbitrator->stats().numFailures, writerSpillEnabled ? 0 : 1);
ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0);
{
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(queryCtx)
.maxDrivers(1)
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, writerSpillEnabled)
.config(
core::QueryConfig::kWriterSpillEnabled, writerSpillEnabled)
// Set 0 file writer flush threshold to always trigger flush in
// test.
.config(core::QueryConfig::kWriterFlushThresholdBytes, 0)
.plan(std::move(writerPlan))
.assertResults(fmt::format("SELECT {}", numRows));
auto planStats = exec::toPlanStats(task->taskStats());
auto& tableWriteStats =
planStats.at(tableWriteNodeId).operatorStats.at("TableWrite");
if (writerSpillEnabled) {
ASSERT_GT(
tableWriteStats->customStats
.at(HiveDataSink::kEarlyFlushedRawBytes)
.count,
0);
ASSERT_GT(
tableWriteStats->customStats
.at(HiveDataSink::kEarlyFlushedRawBytes)
.sum,
0);
ASSERT_EQ(arbitrator->stats().numFailures, 0);
} else {
ASSERT_EQ(
tableWriteStats->customStats.count(
HiveDataSink::kEarlyFlushedRawBytes),
0);
ASSERT_EQ(arbitrator->stats().numFailures, 1);
}
ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0);
}
waitForAllTasksToBeDeleted(3'000'000);
}
}
Expand Down Expand Up @@ -3541,6 +3567,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) {

ASSERT_EQ(arbitrator->stats().numFailures, writerSpillEnabled ? 0 : 1);
ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0);

waitForAllTasksToBeDeleted(3'000'000);
const auto updatedSpillStats = common::globalSpillStats();
if (writerSpillEnabled) {
Expand Down

0 comments on commit a42079c

Please sign in to comment.