Skip to content

Commit

Permalink
Reformat Snowflake files
Browse files Browse the repository at this point in the history
  • Loading branch information
sali-a committed Apr 16, 2024
1 parent 18f8c1a commit 043586d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
* @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
*/
case class SnowflakeInput(
options: Map[String, String],
config: Config = ConfigFactory.empty()
) extends Input
with Logging {
options: Map[String, String],
config: Config = ConfigFactory.empty()
) extends Input
with Logging {

val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"

Expand All @@ -27,7 +27,7 @@ case class SnowflakeInput(
* @throws Exception If the exactly one of the dateRange/dateColumn fields is None.
*/
override def read(implicit spark: SparkSession): DataFrame = {
spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load()
spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load()
}

}
Expand All @@ -42,6 +42,6 @@ object SnowflakeInput {
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
*/
def apply(implicit config: Config): SnowflakeInput = {
SnowflakeInput(options = getOptions , config = config)
SnowflakeInput(options = getOptions, config = config)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import org.apache.spark.sql.{Dataset, SparkSession}
* @param config the config object.
*/
case class SnowflakeOutput(
mode: String,
options: Map[String, String],
config: Config = ConfigFactory.empty()
) extends Output
with Logging {
mode: String,
options: Map[String, String],
config: Config = ConfigFactory.empty()
) extends Output
with Logging {

val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"


/**
* Writes data to this output.
*
Expand Down Expand Up @@ -54,4 +53,4 @@ object SnowflakeOutput {
SnowflakeOutput(mode = mode, options = getOptions, config = config)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ class SnowflakeInputTest extends AnyWordSpec with Matchers {
val config = ConfigFactory.parseMap(
Map(
"Input" -> Map(
"Name" -> "my-test-snowflake",
"Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput",
"Options" -> Map(
"Name" -> "my-test-snowflake",
"Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput",
"Options" -> Map(
"sfDatabase" -> "TESTDATABASE",
"sfSchema" -> "TESTSCHEMA",
"sfUser" -> "TESTUSER",
Expand Down Expand Up @@ -43,8 +43,8 @@ class SnowflakeInputTest extends AnyWordSpec with Matchers {
val config = ConfigFactory.parseMap(
Map(
"Input" -> Map(
"Name" -> "my-test-snowflake",
"Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput",
"Name" -> "my-test-snowflake",
"Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput"
)
)
)
Expand All @@ -55,4 +55,4 @@ class SnowflakeInputTest extends AnyWordSpec with Matchers {

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.typesafe.config.{ConfigException, ConfigFactory}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec


class SnowflakeOutputTest extends AnyWordSpec with Matchers {

"SnowflakeOutput" should {
Expand All @@ -14,16 +13,16 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {
val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput",
"Name" -> "my-test-snowflake",
"Mode" -> "append",
"Options" -> Map(
"dbTable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "warehouse",
"Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput",
"Name" -> "my-test-snowflake",
"Mode" -> "append",
"Options" -> Map(
"dbTable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "warehouse",
"column_mapping" -> "name"
)
)
Expand All @@ -33,12 +32,12 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {
val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output"))

val expectedSnowflakeOptions = Map(
"dbTable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "warehouse",
"dbTable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "warehouse",
"column_mapping" -> "name"
)

Expand All @@ -50,16 +49,16 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {

val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput",
"Name" -> "my-test-snowflake",
"Options" -> Map(
"dbTable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "warehouse",
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput",
"Name" -> "my-test-snowflake",
"Options" -> Map(
"dbTable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "warehouse",
"column_mapping" -> "name"
)
)
Expand All @@ -74,9 +73,9 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {

val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput",
"Name" -> "my-test-snowflake",
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput",
"Name" -> "my-test-snowflake"
)
)
)
Expand All @@ -85,7 +84,6 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {
intercept[ConfigException](snowflakeStreamOutput)
}


}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {
"Timeout" -> "24",
"Options" -> Map(
"dbtable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "test-warehouse",
"sfRole" -> "tester"
)
Expand Down Expand Up @@ -67,10 +67,10 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {
"AddTimestampOnInsert" -> true,
"Options" -> Map(
"dbtable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "test-warehouse",
"sfRole" -> "tester"
)
Expand Down Expand Up @@ -111,10 +111,10 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {
"AddTimestampOnInsert" -> false,
"Options" -> Map(
"dbtable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "test-warehouse",
"sfRole" -> "tester"
)
Expand All @@ -126,10 +126,10 @@ class SnowflakeOutputTest extends AnyWordSpec with Matchers {

val expectedSnowflakeOptions = Map(
"dbtable" -> "test-table",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfUrl" -> "http://snowflake.com",
"sfUser" -> "my-user",
"sfDatabase" -> "db",
"sfSchema" -> "test-schema",
"sfWarehouse" -> "test-warehouse",
"sfRole" -> "tester"
)
Expand Down

0 comments on commit 043586d

Please sign in to comment.