Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Snowflake connector #12

Merged
merged 5 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ ThisBuild / libraryDependencies ++= Seq(
// Spark
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion.value,
"org.apache.spark" %% "spark-sql" % sparkVersion.value,
"org.apache.spark" %% "spark-core" % sparkVersion.value
"org.apache.spark" %% "spark-core" % sparkVersion.value,
"net.snowflake" %% "spark-snowflake" % f"2.15.0-spark_3.4"
)

// Tests configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.amadeus.dataio.pipes.snowflake.batch

import com.amadeus.dataio.core.{Input, Logging}
import com.amadeus.dataio.config.fields._
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* Class for reading Snowflake input
*
* @param options the snowflake connector options.
* @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
*/
case class SnowflakeInput(
options: Map[String, String],
config: Config = ConfigFactory.empty()
) extends Input
with Logging {

val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"

/**
* Reads a batch of data from snowflake.
*
* @param spark The SparkSession which will be used to read the data.
* @return The data that was read.
* @throws Exception If the exactly one of the dateRange/dateColumn fields is None.
*/
override def read(implicit spark: SparkSession): DataFrame = {
spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load()
}

}

object SnowflakeInput {

/**
* Creates a new instance of SnowflakeInput from a typesafe Config object.
*
* @param config typesafe Config object containing the configuration fields.
* @return a new SnowflakeInput object.
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
*/
def apply(implicit config: Config): SnowflakeInput = {
SnowflakeInput(options = getOptions, config = config)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.amadeus.dataio.pipes.snowflake.batch

import com.amadeus.dataio.config.fields._
import com.amadeus.dataio.core.{Logging, Output}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.{Dataset, SparkSession}

/**
* Allows to write data to Snowflake.
*
* @param mode the mode to use.
* @param options the snowflake connector options.
* @param config the config object.
*/
case class SnowflakeOutput(
mode: String,
options: Map[String, String],
config: Config = ConfigFactory.empty()
) extends Output
with Logging {

val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"

/**
* Writes data to this output.
*
* @param data The data to write.
* @param spark The SparkSession which will be used to write the data.
*/
override def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {

data.write
.format(SNOWFLAKE_CONNECTOR_NAME)
.options(options)
.mode(mode)
.save()
}
}

object SnowflakeOutput {

/**
* Creates a new instance of SnowflakeOutput from a typesafe Config object.
*
* @param config typesafe Config object containing the configuration fields.
* @return a new SnowflakeOutput object.
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
*/
def apply(implicit config: Config): SnowflakeOutput = {

val mode = config.getString("Mode")

SnowflakeOutput(mode = mode, options = getOptions, config = config)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.amadeus.dataio.pipes.snowflake.streaming

import com.amadeus.dataio.core.{Logging, Output}
import org.apache.spark.sql.streaming.Trigger
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import scala.util.Try

/**
* Allows to write stream data to Snowflake.
*
* \!/ It uses the snowflake connector and therefore guarantees only at least once delivery. \!/
*
* @param trigger the trigger to be used for the streaming query.
* @param timeout the streaming query timeout.
* @param mode the mode to use.
* @param options the snowflake connector options.
* @param addTimestampOnInsert if true, a timestamp column wit the current timestamp will be added to the data.
* @param config the config object.
* @param outputName the output name used to define the streaming query name.
*/
case class SnowflakeOutput(
timeout: Long,
trigger: Option[Trigger],
mode: String,
options: Map[String, String],
addTimestampOnInsert: Boolean,
config: Config = ConfigFactory.empty(),
outputName: Option[String]
) extends Output
with Logging {

val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake"

/**
* Writes data to this output.
*
* @param data The data to write.
* @param spark The SparkSession which will be used to write the data.
*/
def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {

val dbTable: Option[String] = options.get("dbtable")
dbTable.foreach(table => logger.info(s"Writing dataframe to snowflake table ${table}"))
trigger.foreach(trigger => logger.info(s"Using trigger ${trigger}"))

logger.info(s"Add timestamp on insert: $addTimestampOnInsert")

val queryName = createQueryName()

var streamWriter = data.writeStream.queryName(queryName)

streamWriter = trigger match {
case Some(trigger) => streamWriter.trigger(trigger)
case _ => streamWriter
}

streamWriter.foreachBatch((batchDF: Dataset[T], _: Long) => {
addTimestampOnInsert(batchDF).write
.format(SNOWFLAKE_CONNECTOR_NAME)
.options(options)
.mode(mode)
.save()
})

val streamingQuery = streamWriter.start()

streamingQuery.awaitTermination(timeout)
streamingQuery.stop()
}

/**
* Add current timestamp to the data if needed.
*
* @param data the dataset to enrich.
* @tparam T the dataset type.
* @return the dataset with the timestamp column if needed.
*/
private def addTimestampOnInsert[T](data: Dataset[T]): DataFrame = {
if (addTimestampOnInsert) {
data.withColumn("timestamp", current_timestamp())
} else {
data.toDF()
}
}

/**
* Create a unique query name based on output topic.
*
* @return a unique query name.
*/
private[streaming] def createQueryName(): String = {

val dbTable: Option[String] = options.get("dbtable")

(outputName, dbTable) match {
case (Some(name), Some(table)) => s"QN_${name}_${table}_${java.util.UUID.randomUUID}"
case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}"
case (None, Some(table)) => s"QN_${table}_${java.util.UUID.randomUUID}"
case _ => s"QN_${java.util.UUID.randomUUID}"
}

}
}

object SnowflakeOutput {
import com.amadeus.dataio.config.fields._

/**
* Creates a new instance of SnowflakeOutput from a typesafe Config object.
*
* @param config typesafe Config object containing the configuration fields.
* @return a new SnowflakeOutput object.
* @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument.
*/
def apply(implicit config: Config): SnowflakeOutput = {

val mode = config.getString("Mode")
val addTimestampOnInsert = Try(config.getBoolean("AddTimestampOnInsert")).getOrElse(false)

val duration = Try(config.getString("Duration")).toOption
val trigger = duration.map(Trigger.ProcessingTime)

val name = Try(config.getString("Name")).toOption

SnowflakeOutput(
timeout = getTimeout,
trigger = trigger,
mode = mode,
options = getOptions,
addTimestampOnInsert = addTimestampOnInsert,
config = config,
outputName = name
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.amadeus.dataio.pipes.snowflake.batch

import com.amadeus.dataio.testutils.JavaImplicitConverters._
import com.typesafe.config.{ConfigException, ConfigFactory}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class SnowflakeInputTest extends AnyWordSpec with Matchers {

"SnowflakeInput" should {
"be initialized according to configuration" in {
val config = ConfigFactory.parseMap(
Map(
"Input" -> Map(
"Name" -> "my-test-snowflake",
"Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput",
"Options" -> Map(
"sfDatabase" -> "TESTDATABASE",
"sfSchema" -> "TESTSCHEMA",
"sfUser" -> "TESTUSER",
"sfUrl" -> "snowflake.url.com",
"dbtable" -> "TESTTABLE"
)
)
)
)

val snowflakeInputObj = SnowflakeInput.apply(config.getConfig("Input"))

val expectedSnowflakeOptions = Map(
"sfDatabase" -> "TESTDATABASE",
"sfSchema" -> "TESTSCHEMA",
"sfUser" -> "TESTUSER",
"sfUrl" -> "snowflake.url.com",
"dbtable" -> "TESTTABLE"
)

snowflakeInputObj.options shouldEqual expectedSnowflakeOptions
}
}

"be initialized according to configuration" in {
val config = ConfigFactory.parseMap(
Map(
"Input" -> Map(
"Name" -> "my-test-snowflake",
"Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput"
)
)
)

def snowflakeInputObj = SnowflakeInput.apply(config.getConfig("Input"))

intercept[ConfigException](snowflakeInputObj)

}

}
Loading
Loading