-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Iterator's and its payloads' lifecycle improvements #3526
Changes from all commits
0bab9dc
07dee0b
fd95d00
e75cea8
dc63153
82a8df7
60c46e6
24a63eb
315b87e
f8bbcd0
c6acb45
c7db848
b6afc69
e30cc4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package io.glutenproject.execution | |
import io.glutenproject.columnarbatch.ColumnarBatches | ||
import io.glutenproject.extension.ValidationResult | ||
import io.glutenproject.memory.nmm.NativeMemoryManagers | ||
import io.glutenproject.utils.Iterators | ||
import io.glutenproject.vectorized.NativeColumnarToRowJniWrapper | ||
|
||
import org.apache.spark.rdd.RDD | ||
|
@@ -28,7 +29,6 @@ import org.apache.spark.sql.execution.SparkPlan | |
import org.apache.spark.sql.execution.metric.SQLMetric | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.sql.vectorized.ColumnarBatch | ||
import org.apache.spark.util.TaskResources | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
|
@@ -98,26 +98,13 @@ object VeloxColumnarToRowExec { | |
|
||
// TODO:: pass the jni jniWrapper and arrowSchema and serializeSchema method by broadcast | ||
val jniWrapper = NativeColumnarToRowJniWrapper.create() | ||
var closed = false | ||
val c2rId = jniWrapper.nativeColumnarToRowInit( | ||
NativeMemoryManagers.contextInstance("ColumnarToRow").getNativeInstanceHandle) | ||
|
||
TaskResources.addRecycler(s"ColumnarToRow_$c2rId", 100) { | ||
if (!closed) { | ||
jniWrapper.nativeClose(c2rId) | ||
closed = true | ||
} | ||
} | ||
|
||
val res: Iterator[Iterator[InternalRow]] = new Iterator[Iterator[InternalRow]] { | ||
|
||
override def hasNext: Boolean = { | ||
val hasNext = batches.hasNext | ||
if (!hasNext && !closed) { | ||
jniWrapper.nativeClose(c2rId) | ||
closed = true | ||
} | ||
hasNext | ||
batches.hasNext | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems we didi not call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't change any code about payload closing. The deleted lines were for iterator completion so moved to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see it is closed by itself, then it seems c2r did not call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes as you see the batches are closed manually within some |
||
} | ||
|
||
override def next(): Iterator[InternalRow] = { | ||
|
@@ -170,6 +157,13 @@ object VeloxColumnarToRowExec { | |
} | ||
} | ||
} | ||
res.flatten | ||
Iterators | ||
.wrap(res.flatten) | ||
.protectInvocationFlow() // Spark may call `hasNext()` again after a false output which | ||
// is not allowed by Gluten iterators. E.g. GroupedIterator#fetchNextGroupIterator | ||
.recycleIterator { | ||
jniWrapper.nativeClose(c2rId) | ||
} | ||
.create() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep it in the
CHIteratorApi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggested change? I only removed
override
modifier. But not sure where to put this method yet.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is OK to me.