forked from apache/incubator-gluten
-
Notifications
You must be signed in to change notification settings - Fork 1
/
tpch_parquet.scala
74 lines (67 loc) · 2.64 KB
/
tpch_parquet.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import org.apache.spark.sql.execution.debug._
import scala.io.Source
import java.io.File
import java.util.Arrays
import sys.process._
//Configurations:
var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH"
var gluten_root = "/PATH/TO/GLUTEN"
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0)/1000000000.0 + " seconds")
result
}
//Read TPC-H Table from DWRF files
val lineitem = spark.read.format("parquet").load("file://" + parquet_file_path + "/lineitem")
val part = spark.read.format("parquet").load("file://" + parquet_file_path + "/part")
val orders = spark.read.format("parquet").load("file://" + parquet_file_path + "/orders")
val customer = spark.read.format("parquet").load("file://" + parquet_file_path + "/customer")
val supplier = spark.read.format("parquet").load("file://" + parquet_file_path + "/supplier")
val partsupp = spark.read.format("parquet").load("file://" + parquet_file_path + "/partsupp")
val region = spark.read.format("parquet").load("file://" + parquet_file_path + "/region")
val nation = spark.read.format("parquet").load("file://" + parquet_file_path + "/nation")
//Create DWRF based TPC-H Table View
lineitem.createOrReplaceTempView("lineitem")
orders.createOrReplaceTempView("orders")
customer.createOrReplaceTempView("customer")
part.createOrReplaceTempView("part")
supplier.createOrReplaceTempView("supplier")
partsupp.createOrReplaceTempView("partsupp")
nation.createOrReplaceTempView("nation")
region.createOrReplaceTempView("region")
def getListOfFiles(dir: String):List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
//You can run a specific query by using below line
//d.listFiles.filter(_.isFile).filter(_.getName().contains("17.sql")).toList
d.listFiles.filter(_.isFile).toList
} else {
List[File]()
}
}
val fileLists = getListOfFiles(gluten_root + "/backends-velox/workload/tpch/tpch.queries.updated/")
val sorted = fileLists.sortBy {
f => f.getName match {
case name =>
var str = name
str = str.replaceFirst("a", ".1")
str = str.replaceFirst("b", ".2")
str = str.replaceFirst(".sql", "")
str = str.replaceFirst("q", "")
str.toDouble
}}
// Main program to run TPC-H testing
for (t <- sorted) {
println(t)
val fileContents = Source.fromFile(t).getLines.filter(!_.startsWith("--")).mkString(" ")
println(fileContents)
try {
time{spark.sql(fileContents).show}
//spark.sql(fileContents).explain
Thread.sleep(2000)
} catch {
case e: Exception => None
}
}