Skip to content

Commit

Permalink
Enable hive support in spark application (opensearch-project#1845)
Browse files Browse the repository at this point in the history
* Initial spark application draft

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove temp table

Signed-off-by: Rupal Mahajan <[email protected]>

* Add license header

Signed-off-by: Rupal Mahajan <[email protected]>

* Add scalastyle-config and update readme

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix datatype for result and schema

Signed-off-by: Rupal Mahajan <[email protected]>

* Add test

Signed-off-by: Rupal Mahajan <[email protected]>

* Simplify code using toJSON.collect.toList

Signed-off-by: Rupal Mahajan <[email protected]>

* Add example in readme

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix triple quotes issue

Signed-off-by: Rupal Mahajan <[email protected]>

* Update method name and description

Signed-off-by: Rupal Mahajan <[email protected]>

* Add applicationId

Signed-off-by: Rupal Mahajan <[email protected]>

* Enable Hive Support

Signed-off-by: Rupal Mahajan <[email protected]>

* Set flint config

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix json parsing error

Signed-off-by: Rupal Mahajan <[email protected]>

---------

Signed-off-by: Rupal Mahajan <[email protected]>
  • Loading branch information
rupal-bq authored Jul 19, 2023
1 parent f4883c9 commit a34ac9b
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession, Row}
import org.apache.spark.sql.types._

Expand All @@ -31,8 +32,17 @@ object SQLJob {
val auth = args(5)
val region = args(6)

val conf: SparkConf = new SparkConf()
.setAppName("SQLJob")
.set("spark.sql.extensions", "org.opensearch.flint.spark.FlintSparkExtensions")
.set("spark.datasource.flint.host", host)
.set("spark.datasource.flint.port", port)
.set("spark.datasource.flint.scheme", scheme)
.set("spark.datasource.flint.auth", auth)
.set("spark.datasource.flint.region", region)

// Create a SparkSession
val spark = SparkSession.builder().appName("SQLJob").getOrCreate()
val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

try {
// Execute SQL query
Expand Down Expand Up @@ -89,7 +99,7 @@ object SQLJob {

// Create the data rows
val rows = Seq((
result.toJSON.collect.toList.map(_.replaceAll("\"", "'")),
result.toJSON.collect.toList.map(_.replaceAll("'", "\\\\'").replaceAll("\"", "'")),
resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")),
sys.env.getOrElse("EMR_STEP_ID", "unknown"),
spark.sparkContext.applicationId))
Expand Down

0 comments on commit a34ac9b

Please sign in to comment.