-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Iterator's and its payloads' lifecycle improvements #3526
Conversation
Run Gluten Clickhouse CI |
3 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
9a8a0a8
to
5c2b149
Compare
Run Gluten Clickhouse CI |
5 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
a748224
to
31e2290
Compare
Run Gluten Clickhouse CI |
|
||
override def next(): A = { | ||
val a: A = in.next() | ||
closer.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused why do we need synchronized
here. Does it mean hasNext
and next
can be called at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean hasNext and next can be called at the same time?
This case is unusual... However sometime another case might be possible that the iterator was read asynchronously by another thread (say, Velox sometime hand over it's sub-tasks to background thread), but the lambda passed into TaskResources.addRecycler
will be called from the original Spark task thread.
BTW I didn't have all the cases tested but rather programed defensively... It will be hard to debug if we actually bumped into threading issues caused by lack of locking here.
Run Gluten Clickhouse CI |
91d0e8b
to
24a63eb
Compare
Run Gluten Clickhouse CI |
This impacts CH backend's execution either. Would you please help to review? Thanks. |
/Benchmark Velox |
class WrapperBuilder[A](in: Iterator[A]) { // FIXME how to make the ctor companion-private? | ||
private var wrapped: Iterator[A] = in | ||
|
||
def recyclePayload(closeCallback: (A) => Unit): WrapperBuilder[A] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @zhztheplayer for the refactor. Can you explain a bit more why naming a new word payload instead of ColumnarBatch in java ? Do you mean there is something else besides ColumnarBatch will call this method ? I'm wondering if we can make a recycleColumnarBatch
method to make it clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Iterator[A]
, A
should be type of "payloads" of the iterator. It's also possible that A
is not ColumnarBatch
, a typical case is C2R that creates a row iterator as output.
Does payload
sound very confusing here? element
could be another choice but I felt it probably sounds too general for Gluten's use case.
closed = true | ||
} | ||
hasNext | ||
batches.hasNext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we didi not call recyclePayload
for c2r after refactor since the iterator is row-based. Is there a leak ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't change any code about payload closing. The deleted lines were for iterator completion so moved torecycleIterator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it is closed by itself, then it seems c2r did not call recyclePayload
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it is closed by itself, then it seems c2r did not call
recyclePayload
?
Yes as you see the batches are closed manually within some if-else
conditions. I didn't how much effort needed to refactor the usages within #recyclePayload()
so didn't change that part of code in this patch. Probably we can do that in a separate ticket, I am not sure.
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
Run Gluten Clickhouse CI |
@@ -230,11 +230,12 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { | |||
/** | |||
* Generate closeable ColumnBatch iterator. | |||
* | |||
* FIXME: This no longer overrides parent's method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep it in the CHIteratorApi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggested change? I only removed override
modifier. But not sure where to put this method yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggested change? I only removed
override
modifier. But not sure where to put this method yet.
This change is OK to me.
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
LGTM. Thanks! |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
One of the purpose of the patch is to close plan iterators (so Velox's task gets stopped) before stopping shuffle writer.
Will do some cleanups against code.
Changes listed as following:
Iterators#wrap
for miscellaneous operations on iterator, e.g. handle iterator and payload's closing.#recyclePayload
.#recyclePayload
.#recycleIterator
. The method would take early-close into account. Which means, once iterator'shasNext
started to returnfalse
, iterator will be closed. Previously for Velox's out iterator we only close it when task ends.NativeMemoryManager#hold()
to make memory manager hold references of all created memory pools to avoid them from becoming inaccessible after the task gets early-closed.This will also fix some metrics issues when SQL
limit
clause is used since we rely on#hasNext
result to collect iterator metrics.