Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-5414][VL] FEAT: Support read CSV #5447

Merged
merged 7 commits into from
May 8, 2024
Merged

Conversation

jinchengchenghh
Copy link
Contributor

@jinchengchenghh jinchengchenghh commented Apr 18, 2024

The PR use Arrow's CSV reader to parse the CSV file then feed the arrow format data into Velox pipeline.
Quey plan is

*(1) ColumnarToRow
+- ArrowFileScan arrowcsv [Name#17,Language#18] Batched: true, DataFilters: [], Format: org.apache.gluten.datasource.ArrowCSVFileFormat@7772ec28, Location: InMemoryFileIndex(1 paths)[file:/mnt/DP_disk1/code/incubator-gluten/backends-velox/target/scala-2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Name:string,Language:string>


VeloxColumnarToRowExec
+- ^(2) FilterExecTransformer (isnotnull(Name#17) AND (Name#17 = Peter))
   +- ^(2) InputIteratorTransformer[Name#17, Language#18]
      +- ^(2) InputAdapter
         +- ^(2) ArrowFileScan arrowcsv [Name#17,Language#18] Batched: true, DataFilters: [isnotnull(Name#17), (Name#17 = Pet

ArrowFileScan arrowcsv can indicate the file format has already been changed to arrow

If the specified schema is different with file schema, will fallback to vanilla Spark to generate UnsafeRow and then convert to velox ColumnarBatch then to ArrowRecordBatch, because now we don't have row -> arrow converter, and supportsBatch true means we should output a Arrow columnarbatch

This PR introduces protobuf to compile jni code, https://github.com/apache/arrow/pull/36929/files, but the higher protobuf version will cause UnsatisfiedLinkError: /tmp/jnilib-4372912739792055919.tmp: /tmp/jnilib-4372912739792055919.tmp: undefined symbol: _ZTIN6google8protobuf7MessageE, so we need to compile Arrow java dataset module

Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

Run Gluten Clickhouse CI

Comment on lines 222 to 257
class GenericRetainer[T <: AutoCloseable] {
private var retained: Option[T] = None

def retain(batch: T): Unit = {
if (retained.isDefined) {
throw new IllegalStateException
}
retained = Some(batch)
}

def release(): Unit = {
retained.foreach(b => b.close())
retained = None
}
}

class UnsafeItr[T <: AutoCloseable](delegate: Iterator[T]) extends Iterator[T] {
val holder = new GenericRetainer[T]()

addLeakSafeTaskCompletionListener[Unit](
(_: TaskContext) => {
holder.release()
})

override def hasNext: Boolean = {
holder.release()
val hasNext = delegate.hasNext
hasNext
}

override def next(): T = {
val b = delegate.next()
holder.retain(b)
b
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Run Gluten Clickhouse CI

@zhouyuan zhouyuan changed the title [VL] Support read CSV [GLUTEN-5414][VL] Support read CSV Apr 18, 2024
Copy link

#5414

sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps using super.isSplitable(sparkSession, options, path) would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super is false, but here is true. when codec is not empty, it cannot split, I will refactor here

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

@FelixYBW
Copy link
Contributor

Can you paste a UI diagram in the first comment?

@jinchengchenghh jinchengchenghh marked this pull request as draft April 18, 2024 23:35
@@ -458,6 +458,17 @@ class TestOperator extends VeloxWholeStageTransformerSuite {
}
}

test("csv scan") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it support csv with compression like snappy?

Copy link
Contributor Author

@jinchengchenghh jinchengchenghh Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this PR supports compression codec apache/arrow#9685

Copy link

Run Gluten Clickhouse CI

1 similar comment
Copy link

Run Gluten Clickhouse CI

@zhouyuan zhouyuan changed the title [GLUTEN-5414][VL] Support read CSV [GLUTEN-5414][VL] FEAT: Support read CSV Apr 19, 2024
Copy link

Run Gluten Clickhouse CI

10 similar comments
Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

Copy link

Run Gluten Clickhouse CI

@jinchengchenghh jinchengchenghh marked this pull request as ready for review April 23, 2024 23:51
Copy link

github-actions bot commented May 7, 2024

Run Gluten Clickhouse CI

@liujiayi771
Copy link
Contributor

liujiayi771 commented May 7, 2024

We need to add shaded exclude for arrow dataset.
Otherwise, arrow JNI cannot find the correct dataset java method.

@liujiayi771
Copy link
Contributor

liujiayi771 commented May 7, 2024

We also need to add HDFS support for arrow dataset.

caused by: java.lang.RuntimeException: Got HDFS URI but Arrow compiled without HDFS support                                                                                                                                  
        at org.apache.arrow.dataset.file.JniWrapper.makeFileSystemDatasetFactory(Native Method)                                                                                                                              
        at org.apache.arrow.dataset.file.FileSystemDatasetFactory.createNative(FileSystemDatasetFactory.java:40)                                                                                                             
        at org.apache.arrow.dataset.file.FileSystemDatasetFactory.<init>(FileSystemDatasetFactory.java:31)                                                                                                                   
        at org.apache.gluten.utils.ArrowUtil$.makeArrowDiscovery(ArrowUtil.scala:149)                                                                                                                                        
        at org.apache.gluten.datasource.ArrowCSVFileFormat.$anonfun$buildReaderWithPartitionValues$3(ArrowCSVFileFormat.scala:281)                                                                                           
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)                                               
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)                                                                                                                
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)

But even after enabling ARROW_HDFS, there are still issues when accessing csv in HDFS.

[libprotobuf ERROR google/protobuf/descriptor_database.cc:642] File already exists in database: Security.proto                                                                                                               
[libprotobuf FATAL google/protobuf/descriptor.cc:1986] CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):                                                                                                
terminate called after throwing an instance of 'google::protobuf::FatalException'                                                                                                                                            
  what():  CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):   

Copy link

github-actions bot commented May 7, 2024

Run Gluten Clickhouse CI

@jinchengchenghh
Copy link
Contributor Author

We also need to add HDFS support for arrow dataset.

caused by: java.lang.RuntimeException: Got HDFS URI but Arrow compiled without HDFS support                                                                                                                                  
        at org.apache.arrow.dataset.file.JniWrapper.makeFileSystemDatasetFactory(Native Method)                                                                                                                              
        at org.apache.arrow.dataset.file.FileSystemDatasetFactory.createNative(FileSystemDatasetFactory.java:40)                                                                                                             
        at org.apache.arrow.dataset.file.FileSystemDatasetFactory.<init>(FileSystemDatasetFactory.java:31)                                                                                                                   
        at org.apache.gluten.utils.ArrowUtil$.makeArrowDiscovery(ArrowUtil.scala:149)                                                                                                                                        
        at org.apache.gluten.datasource.ArrowCSVFileFormat.$anonfun$buildReaderWithPartitionValues$3(ArrowCSVFileFormat.scala:281)                                                                                           
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)                                               
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)                                                                                                                
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)

But even after enabling ARROW_HDFS, there are still issues when accessing csv in HDFS.

[libprotobuf ERROR google/protobuf/descriptor_database.cc:642] File already exists in database: Security.proto                                                                                                               
[libprotobuf FATAL google/protobuf/descriptor.cc:1986] CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):                                                                                                
terminate called after throwing an instance of 'google::protobuf::FatalException'                                                                                                                                            
  what():  CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):   

This may be in case of protobuf version, I will try to test CSV in HDFS

Copy link

github-actions bot commented May 8, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented May 8, 2024

Run Gluten Clickhouse CI

@jinchengchenghh
Copy link
Contributor Author

After I enable HDFS in ARROW, I can successfully read csv file in HDFS.

scala> val filePath = "/input/student.csv"
filePath: String = /input/student.csv

scala> val df = spark.read.format("csv").option("header", "true").load(filePath)
E0508 10:25:42.841267 3005137 Exceptions.h:69] Line: /mnt/DP_disk1/code/incubator-gluten/ep/build-velox/build/velox_ep/velox/exec/Task.cpp:1850, Function:terminate, Expression:  Cancelled, Source: RUNTIME, ErrorCode: INVALID_STATE
df: org.apache.spark.sql.DataFrame = [Name: string, Language: string]

scala>
     | df.show()
+-----+--------+
| Name|Language|
+-----+--------+
| Juno|    Java|
|Peter|  Python|
|Celin|     C++|
+-----+--------+


scala> print(df.queryExecution.executedPlan)
*(1) ColumnarToRow
+- ArrowFileScan arrowcsv [Name#17,Language#18] Batched: true, DataFilters: [], Format: org.apache.gluten.datasource.ArrowCSVFileFormat@485f3327, Location: InMemoryFileIndex(1 paths)[hdfs://0.0.0.0:9000/input/student.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Name:string,Language:string>


My local protobuf version, I'm not sure if it is related to protobuf version @liujiayi771

root@sr249:/mnt/DP_disk2/tpcds/scripts# protoc --version
libprotoc 3.21.4

Copy link

github-actions bot commented May 8, 2024

Run Gluten Clickhouse CI

3 similar comments
Copy link

github-actions bot commented May 8, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented May 8, 2024

Run Gluten Clickhouse CI

Copy link

github-actions bot commented May 8, 2024

Run Gluten Clickhouse CI

@liujiayi771
Copy link
Contributor

@jinchengchenghh
I have successfully read CSV files in HDFS. Thanks.

@jinchengchenghh
Copy link
Contributor Author

Can you help merge it? Thanks! @zhztheplayer

Copy link
Contributor

@zhouyuan zhouyuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@zhouyuan zhouyuan merged commit 32775f8 into apache:main May 8, 2024
43 checks passed
@FelixYBW
Copy link
Contributor

FelixYBW commented May 8, 2024

@jinchengchenghh can you do a test of performance improvement? csv parser only and csv + Gluten. You may create a large table table and run TPCH Q6.

@jinchengchenghh
Copy link
Contributor Author

jinchengchenghh commented May 11, 2024

TPCH SF2000 Q6 performance, query:
select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= '1994-01-01' and l_shipdate < '1995-01-01' and l_discount between .06 - 0.01 and .06 + 0.01 and l_quantity < 24

lineitem data: 622G

gluten without native reader gluten native csv reader vanilla spark
8333 2456 8385

Test config:

--num-executors 18 \
  --driver-memory 20g \
  --executor-cores 8 \
  --executor-memory 4g \
  --master local[1] \
  --deploy-mode client \
  --conf spark.executor.memoryOverhead=1g \

Test script:

val schema = new StructType().add("l_orderkey", LongType).add("l_partkey", LongType).add("l_suppkey", LongType).add("l_linenumber", LongType).add("l_quantity", DoubleType).add("l_extendedprice", DoubleType).add("l_discount", DoubleType).add("l_tax", DoubleType).add("l_returnflag", StringType).add("l_linestatus", StringType).add("l_shipdate", DateType).add("l_commitdate", DateType).add("l_receiptdate", DateType).add("l_shipinstruct", StringType).add("l_shipmode", StringType).add("l_comment", StringType)

val lineitem = spark.read.format("csv").option("header","true").schema(schema).load("file:///mnt/DP_disk2/tpch/csvdata/")
spark.sql(q6)

Note: because the file schema should match Arrow schema, so we should specify the schema by .schema(arrow_matched_schema)

@FelixYBW
Copy link
Contributor

Th

TPCH SF2000 Q6 performance, query: select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= '1994-01-01' and l_shipdate < '1995-01-01' and l_discount between .06 - 0.01 and .06 + 0.01 and l_quantity < 24

lineitem data: 622G

csv gluten without native reader csv gluten native csv reader
8333.039907 2456
Test script:

val schema = new StructType().add("l_orderkey", LongType).add("l_partkey", LongType).add("l_suppkey", LongType).add("l_linenumber", LongType).add("l_quantity", DoubleType).add("l_extendedprice", DoubleType).add("l_discount", DoubleType).add("l_tax", DoubleType).add("l_returnflag", StringType).add("l_linestatus", StringType).add("l_shipdate", DateType).add("l_commitdate", DateType).add("l_receiptdate", DateType).add("l_shipinstruct", StringType).add("l_shipmode", StringType).add("l_comment", StringType)

val lineitem = spark.read.format("csv").option("header","true").schema(schema).load("file:///mnt/DP_disk2/tpch/csvdata/")
spark.sql(q6)

Note: because the file schema should match Arrow schema, so we should specify the schema by .schema(arrow_matched_schema)

Thank you, Chengcheng. What's the vanilla spark performance in this case? And how many task threads did you use?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants