Skip to content

Commit

Permalink
Add Snowflake connector (#12)
Browse files Browse the repository at this point in the history
* Add Snowflake batch feature

* Add Snowflake Streaming output class

* Add documentation for Snowflake Streaming

* Reformat Snowflake files

* Add Spark-snowflake connector as dependency
  • Loading branch information
sali-a authored Apr 25, 2024
1 parent ff7de7b commit 6dacc29
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 1 deletion.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ ThisBuild / libraryDependencies ++= Seq(
// Spark
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion.value,
"org.apache.spark" %% "spark-sql" % sparkVersion.value,
"org.apache.spark" %% "spark-core" % sparkVersion.value
"org.apache.spark" %% "spark-core" % sparkVersion.value,
"net.snowflake" %% "spark-snowflake" % f"2.15.0-spark_3.4"
)

// Tests configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.amadeus.dataio.pipes.snowflake.batch

import com.amadeus.dataio.core.{Input, Logging}
import com.amadeus.dataio.config.fields._
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* Class for reading Snowflake input
*
* @param options the snowflake connector options.
* @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
*/
case class SnowflakeInput(
options: Map[String, String],
config: Config = ConfigFactory.empty()
) extends Input
with Logging {

val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"

/**
* Reads a batch of data from snowflake.
*
* @param spark The SparkSession which will be used to read the data.
* @return The data that was read.
* @throws Exception If the exactly one of the dateRange/dateColumn fields is None.
*/
override def read(implicit spark: SparkSession): DataFrame = {
spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load()
}

}

object SnowflakeInput {

/**
* Creates a new instance of SnowflakeInput from a typesafe Config object.
*
* @param config typesafe Config object containing the configuration fields.
* @return a new SnowflakeInput object.
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
*/
def apply(implicit config: Config): SnowflakeInput = {
SnowflakeInput(options = getOptions, config = config)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.amadeus.dataio.pipes.snowflake.batch

import com.amadeus.dataio.config.fields._
import com.amadeus.dataio.core.{Logging, Output}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.{Dataset, SparkSession}

/**
* Allows to write data to Snowflake.
*
* @param mode the mode to use.
* @param options the snowflake connector options.
* @param config the config object.
*/
case class SnowflakeOutput(
mode: String,
options: Map[String, String],
config: Config = ConfigFactory.empty()
) extends Output
with Logging {

val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"

/**
* Writes data to this output.
*
* @param data The data to write.
* @param spark The SparkSession which will be used to write the data.
*/
override def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {

data.write
.format(SNOWFLAKE_CONNECTOR_NAME)
.options(options)
.mode(mode)
.save()
}
}

object SnowflakeOutput {

/**
* Creates a new instance of SnowflakeOutput from a typesafe Config object.
*
* @param config typesafe Config object containing the configuration fields.
* @return a new SnowflakeOutput object.
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
*/
def apply(implicit config: Config): SnowflakeOutput = {

val mode = config.getString("Mode")

SnowflakeOutput(mode = mode, options = getOptions, config = config)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.amadeus.dataio.pipes.snowflake.streaming

import com.amadeus.dataio.core.{Logging, Output}
import org.apache.spark.sql.streaming.Trigger
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import scala.util.Try

/**
* Allows to write stream data to Snowflake.
*
* \!/ It uses the snowflake connector and therefore guarantees only at least once delivery. \!/
*
* @param trigger the trigger to be used for the streaming query.
* @param timeout the streaming query timeout.
* @param mode the mode to use.
* @param options the snowflake connector options.
* @param addTimestampOnInsert if true, a timestamp column wit the current timestamp will be added to the data.
* @param config the config object.
* @param outputName the output name used to define the streaming query name.
*/
case class SnowflakeOutput(
timeout: Long,
trigger: Option[Trigger],
mode: String,
options: Map[String, String],
addTimestampOnInsert: Boolean,
config: Config = ConfigFactory.empty(),
outputName: Option[String]
) extends Output
with Logging {

val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"

/**
* Writes data to this output.
*
* @param data The data to write.
* @param spark The SparkSession which will be used to write the data.
*/
def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {

val dbTable: Option[String] = options.get("dbtable")
dbTable.foreach(table => logger.info(s"Writing dataframe to snowflake table ${table}"))
trigger.foreach(trigger => logger.info(s"Using trigger ${trigger}"))

logger.info(s"Add timestamp on insert: $addTimestampOnInsert")

val queryName = createQueryName()

var streamWriter = data.writeStream.queryName(queryName)

streamWriter = trigger match {
case Some(trigger) => streamWriter.trigger(trigger)
case _ => streamWriter
}

streamWriter.foreachBatch((batchDF: Dataset[T], _: Long) => {
addTimestampOnInsert(batchDF).write
.format(SNOWFLAKE_CONNECTOR_NAME)
.options(options)
.mode(mode)
.save()
})

val streamingQuery = streamWriter.start()

streamingQuery.awaitTermination(timeout)
streamingQuery.stop()
}

/**
* Add current timestamp to the data if needed.
*
* @param data the dataset to enrich.
* @tparam T the dataset type.
* @return the dataset with the timestamp column if needed.
*/
private def addTimestampOnInsert[T](data: Dataset[T]): DataFrame = {
if (addTimestampOnInsert) {
data.withColumn("timestamp", current_timestamp())
} else {
data.toDF()
}
}

/**
* Create a unique query name based on output topic.
*
* @return a unique query name.
*/
private[streaming] def createQueryName(): String = {

val dbTable: Option[String] = options.get("dbtable")

(outputName, dbTable) match {
case (Some(name), Some(table)) => s"QN_${name}_${table}_${java.util.UUID.randomUUID}"
case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}"
case (None, Some(table)) => s"QN_${table}_${java.util.UUID.randomUUID}"
case _ => s"QN_${java.util.UUID.randomUUID}"
}

}
}

object SnowflakeOutput {
import com.amadeus.dataio.config.fields._

/**
* Creates a new instance of SnowflakeOutput from a typesafe Config object.
*
* @param config typesafe Config object containing the configuration fields.
* @return a new SnowflakeOutput object.
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
*/
def apply(implicit config: Config): SnowflakeOutput = {

val mode = config.getString("Mode")
val addTimestampOnInsert = Try(config.getBoolean("AddTimestampOnInsert")).getOrElse(false)

val duration = Try(config.getString("Duration")).toOption
val trigger = duration.map(Trigger.ProcessingTime)

val name = Try(config.getString("Name")).toOption

SnowflakeOutput(
timeout = getTimeout,
trigger = trigger,
mode = mode,
options = getOptions,
addTimestampOnInsert = addTimestampOnInsert,
config = config,
outputName = name
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.amadeus.dataio.pipes.snowflake.batch

import com.amadeus.dataio.testutils.JavaImplicitConverters._
import com.typesafe.config.{ConfigException, ConfigFactory}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class SnowflakeInputTest extends AnyWordSpec with Matchers {

"SnowflakeInput" should {
"be initialized according to configuration" in {
val config = ConfigFactory.parseMap(
Map(
"Input" -> Map(
"Name" -> "my-test-snowflake",
"Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput",
"Options" -> Map(
"sfDatabase" -> "TESTDATABASE",
"sfSchema" -> "TESTSCHEMA",
"sfUser" -> "TESTUSER",
"sfUrl" -> "snowflake.url.com",
"dbtable" -> "TESTTABLE"
)
)
)
)

val snowflakeInputObj = SnowflakeInput.apply(config.getConfig("Input"))

val expectedSnowflakeOptions = Map(
"sfDatabase" -> "TESTDATABASE",
"sfSchema" -> "TESTSCHEMA",
"sfUser" -> "TESTUSER",
"sfUrl" -> "snowflake.url.com",
"dbtable" -> "TESTTABLE"
)

snowflakeInputObj.options shouldEqual expectedSnowflakeOptions
}
}

"be initialized according to configuration" in {
val config = ConfigFactory.parseMap(
Map(
"Input" -> Map(
"Name" -> "my-test-snowflake",
"Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput"
)
)
)

def snowflakeInputObj = SnowflakeInput.apply(config.getConfig("Input"))

intercept[ConfigException](snowflakeInputObj)

}

}
Loading

0 comments on commit 6dacc29

Please sign in to comment.