Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7243][VL] Suspend the Velox task while reading an input Java iterator to make the task spillable #7748

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,6 @@ int64_t WholeStageResultIterator::spillFixedSize(int64_t size) {
std::string logPrefix{"Spill[" + poolName + "]: "};
int64_t shrunken = memoryManager_->shrink(size);
if (spillStrategy_ == "auto") {
if (task_->numThreads() != 0) {
// Task should have zero running threads, otherwise there's
// possibility that this spill call hangs. See https://github.com/apache/incubator-gluten/issues/7243.
// As of now, non-zero running threads usually happens when:
// 1. Task A spills task B;
// 2. Task A trys to grow buffers created by task B, during which spill is requested on task A again;
LOG(INFO) << fmt::format(
"{} spill is requested on a task {} that has non-zero running threads, which is not currently supported. Skipping.",
logPrefix,
task_->taskId());
return shrunken;
}
int64_t remaining = size - shrunken;
LOG(INFO) << fmt::format("{} trying to request spill for {}.", logPrefix, velox::succinctBytes(remaining));
auto mm = memoryManager_->getMemoryManager();
Expand Down
70 changes: 48 additions & 22 deletions cpp/velox/operators/plannodes/RowVectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,57 +26,81 @@ namespace gluten {
class RowVectorStream {
public:
explicit RowVectorStream(
facebook::velox::exec::DriverCtx* driverCtx,
facebook::velox::memory::MemoryPool* pool,
std::shared_ptr<ResultIterator> iterator,
ResultIterator* iterator,
const facebook::velox::RowTypePtr& outputType)
: iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}
: driverCtx_(driverCtx), pool_(pool), outputType_(outputType), iterator_(iterator) {}

bool hasNext() {
if (!finished_) {
finished_ = !iterator_->hasNext();
if (finished_) {
return false;
}
return !finished_;
bool hasNext;
{
// We are leaving Velox task execution and are probably entering Spark code through JNI. Suspend the current
// driver to make the current task open to spilling.
//
// When a task is getting spilled, it should have been suspended so has zero running threads, otherwise there's
// possibility that this spill call hangs. See https://github.com/apache/incubator-gluten/issues/7243.
// As of now, non-zero running threads usually happens when:
// 1. Task A spills task B;
// 2. Task A trys to grow buffers created by task B, during which spill is requested on task A again.
facebook::velox::exec::SuspendedSection(driverCtx_->driver);
hasNext = iterator_->hasNext();
}
if (!hasNext) {
finished_ = true;
}
return hasNext;
}

// Convert arrow batch to rowvector and use new output columns
facebook::velox::RowVectorPtr next() {
if (finished_) {
return nullptr;
}
const std::shared_ptr<VeloxColumnarBatch>& vb = VeloxColumnarBatch::from(pool_, iterator_->next());
std::shared_ptr<ColumnarBatch> cb;
{
// We are leaving Velox task execution and are probably entering Spark code through JNI. Suspend the current
// driver to make the current task open to spilling.
facebook::velox::exec::SuspendedSection(driverCtx_->driver);
cb = iterator_->next();
}
const std::shared_ptr<VeloxColumnarBatch>& vb = VeloxColumnarBatch::from(pool_, cb);
auto vp = vb->getRowVector();
VELOX_DCHECK(vp != nullptr);
return std::make_shared<facebook::velox::RowVector>(
vp->pool(), outputType_, facebook::velox::BufferPtr(0), vp->size(), vp->children());
}

private:
bool finished_{false};
std::shared_ptr<ResultIterator> iterator_;
const facebook::velox::RowTypePtr outputType_;
facebook::velox::exec::DriverCtx* driverCtx_;
facebook::velox::memory::MemoryPool* pool_;
const facebook::velox::RowTypePtr outputType_;
ResultIterator* iterator_;

bool finished_{false};
};

class ValueStreamNode final : public facebook::velox::core::PlanNode {
public:
ValueStreamNode(
const facebook::velox::core::PlanNodeId& id,
const facebook::velox::RowTypePtr& outputType,
std::unique_ptr<RowVectorStream> valueStream)
: facebook::velox::core::PlanNode(id), outputType_(outputType), valueStream_(std::move(valueStream)) {
VELOX_CHECK_NOT_NULL(valueStream_);
}
std::shared_ptr<ResultIterator> iterator)
: facebook::velox::core::PlanNode(id), outputType_(outputType), iterator_(std::move(iterator)) {}

const facebook::velox::RowTypePtr& outputType() const override {
return outputType_;
}

const std::vector<facebook::velox::core::PlanNodePtr>& sources() const override {
return kEmptySources;
return kEmptySources_;
};

RowVectorStream* rowVectorStream() const {
return valueStream_.get();
ResultIterator* iterator() const {
return iterator_.get();
}

std::string_view name() const override {
Expand All @@ -91,8 +115,8 @@ class ValueStreamNode final : public facebook::velox::core::PlanNode {
void addDetails(std::stringstream& stream) const override{};

const facebook::velox::RowTypePtr outputType_;
std::unique_ptr<RowVectorStream> valueStream_;
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
std::shared_ptr<ResultIterator> iterator_;
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
};

class ValueStream : public facebook::velox::exec::SourceOperator {
Expand All @@ -107,15 +131,17 @@ class ValueStream : public facebook::velox::exec::SourceOperator {
operatorId,
valueStreamNode->id(),
valueStreamNode->name().data()) {
valueStream_ = valueStreamNode->rowVectorStream();
ResultIterator* itr = valueStreamNode->iterator();
VELOX_CHECK_NOT_NULL(itr);
rvStream_ = std::make_unique<RowVectorStream>(driverCtx, pool(), itr, outputType_);
}

facebook::velox::RowVectorPtr getOutput() override {
if (finished_) {
return nullptr;
}
if (valueStream_->hasNext()) {
return valueStream_->next();
if (rvStream_->hasNext()) {
return rvStream_->next();
} else {
finished_ = true;
return nullptr;
Expand All @@ -132,7 +158,7 @@ class ValueStream : public facebook::velox::exec::SourceOperator {

private:
bool finished_ = false;
RowVectorStream* valueStream_;
std::unique_ptr<RowVectorStream> rvStream_;
};

class RowVectorStreamOperatorTranslator : public facebook::velox::exec::Operator::PlanNodeTranslator {
Expand Down
3 changes: 1 addition & 2 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx);
iterator = inputIters_[streamIdx];
}
auto valueStream = std::make_unique<RowVectorStream>(pool_, iterator, outputType);
auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, std::move(valueStream));
auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, std::move(iterator));

auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with LogLevelU
* Validate whether this SparkPlan supports to be transformed into substrait node in Native Code.
*/
final def doValidate(): ValidationResult = {
val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance
val schemaValidationResult = BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
reason =>
ValidationResult.failed(s"Found schema check failure for $schema, due to: $reason")
}
.getOrElse(ValidationResult.succeeded)
if (!schemaVaidationResult.ok()) {
if (!schemaValidationResult.ok()) {
TestStats.addFallBackClassName(this.getClass.toString)
return schemaVaidationResult
return schemaValidationResult
}
try {
TransformerState.enterValidation
Expand Down
Loading