Skip to content

Commit

Permalink
[WIP] parallelize connector reclaim
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed May 22, 2024
1 parent 4a1a391 commit 6ba6c20
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
2 changes: 1 addition & 1 deletion velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ uint64_t TableWriter::ConnectorReclaimer::reclaim(
return 0;
}
RuntimeStatWriterScopeGuard opStatsGuard(op_);
return memory::MemoryReclaimer::reclaim(pool, targetBytes, maxWaitMs, stats);
return ParallelMemoryReclaimer::reclaim(pool, targetBytes, maxWaitMs, stats);
}

// static
Expand Down
20 changes: 15 additions & 5 deletions velox/exec/TableWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ class TableWriteTraits {
velox::memory::MemoryPool* pool);
};

/**
* Implements a simple table writer operator.
*/
class TableWriter : public Operator {
public:
TableWriter(
Expand Down Expand Up @@ -149,8 +146,12 @@ class TableWriter : public Operator {
// The memory reclaimer customized for connector which interface with the
// memory arbitrator to reclaim memory from the file writers created within
// the connector.
class ConnectorReclaimer : public Operator::MemoryReclaimer {
class ConnectorReclaimer : public exec::ParallelMemoryReclaimer {
public:
static inline std::shared_ptr<folly::CPUThreadPoolExecutor>
connectorExecutor = std::make_shared<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency() * 2,
std::make_shared<folly::NamedThreadFactory>("ConnectorExecutor"));
static std::unique_ptr<memory::MemoryReclaimer>
create(DriverCtx* driverCtx, Operator* op, bool canReclaim);

Expand All @@ -171,14 +172,23 @@ class TableWriter : public Operator {
void abort(memory::MemoryPool* pool, const std::exception_ptr& /* error */)
override {}

std::shared_ptr<Driver> ensureDriver() const {
return driver_.lock();
}

private:
ConnectorReclaimer(
const std::shared_ptr<Driver>& driver,
Operator* op,
bool canReclaim)
: Operator::MemoryReclaimer(driver, op), canReclaim_(canReclaim) {}
: ParallelMemoryReclaimer(connectorExecutor.get()),
canReclaim_(canReclaim),
driver_(driver),
op_(op) {}

const bool canReclaim_{false};
std::weak_ptr<Driver> driver_;
Operator* op_;
};

void createDataSink();
Expand Down

0 comments on commit 6ba6c20

Please sign in to comment.