-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-5414][VL] FEAT: Support read CSV #5447
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/apache/incubator-gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
class GenericRetainer[T <: AutoCloseable] { | ||
private var retained: Option[T] = None | ||
|
||
def retain(batch: T): Unit = { | ||
if (retained.isDefined) { | ||
throw new IllegalStateException | ||
} | ||
retained = Some(batch) | ||
} | ||
|
||
def release(): Unit = { | ||
retained.foreach(b => b.close()) | ||
retained = None | ||
} | ||
} | ||
|
||
class UnsafeItr[T <: AutoCloseable](delegate: Iterator[T]) extends Iterator[T] { | ||
val holder = new GenericRetainer[T]() | ||
|
||
addLeakSafeTaskCompletionListener[Unit]( | ||
(_: TaskContext) => { | ||
holder.release() | ||
}) | ||
|
||
override def hasNext: Boolean = { | ||
holder.release() | ||
val hasNext = delegate.hasNext | ||
hasNext | ||
} | ||
|
||
override def next(): T = { | ||
val b = delegate.next() | ||
holder.retain(b) | ||
b | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some new APIs to replace this sort of wrappers
Run Gluten Clickhouse CI |
sparkSession: SparkSession, | ||
options: Map[String, String], | ||
path: Path): Boolean = { | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps using super.isSplitable(sparkSession, options, path)
would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super is false, but here is true. when codec is not empty, it cannot split, I will refactor here
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Can you paste a UI diagram in the first comment? |
@@ -458,6 +458,17 @@ class TestOperator extends VeloxWholeStageTransformerSuite { | |||
} | |||
} | |||
|
|||
test("csv scan") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it support csv with compression like snappy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this PR supports compression codec apache/arrow#9685
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
10 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
We need to add shaded exclude for arrow dataset. |
We also need to add HDFS support for arrow dataset.
But even after enabling ARROW_HDFS, there are still issues when accessing csv in HDFS.
|
Run Gluten Clickhouse CI |
This may be in case of protobuf version, I will try to test CSV in HDFS |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
After I enable HDFS in ARROW, I can successfully read csv file in HDFS.
My local protobuf version, I'm not sure if it is related to protobuf version @liujiayi771
|
Run Gluten Clickhouse CI |
3 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
@jinchengchenghh |
Can you help merge it? Thanks! @zhztheplayer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@jinchengchenghh can you do a test of performance improvement? csv parser only and csv + Gluten. You may create a large table table and run TPCH Q6. |
TPCH SF2000 Q6 performance, query: lineitem data: 622G
Test config:
Test script:
Note: because the file schema should match Arrow schema, so we should specify the schema by |
Th
Thank you, Chengcheng. What's the vanilla spark performance in this case? And how many task threads did you use? |
The PR use Arrow's CSV reader to parse the CSV file then feed the arrow format data into Velox pipeline.
Quey plan is
ArrowFileScan arrowcsv
can indicate the file format has already been changed to arrowIf the specified schema is different with file schema, will fallback to vanilla Spark to generate UnsafeRow and then convert to velox ColumnarBatch then to ArrowRecordBatch, because now we don't have row -> arrow converter, and
supportsBatch
true means we should output a Arrow columnarbatchThis PR introduces protobuf to compile jni code, https://github.com/apache/arrow/pull/36929/files, but the higher protobuf version will cause
UnsatisfiedLinkError: /tmp/jnilib-4372912739792055919.tmp: /tmp/jnilib-4372912739792055919.tmp: undefined symbol: _ZTIN6google8protobuf7MessageE
, so we need to compile Arrow java dataset module