diff --git a/build.sbt b/build.sbt index daabe0c..7a9f208 100644 --- a/build.sbt +++ b/build.sbt @@ -16,7 +16,8 @@ ThisBuild / libraryDependencies ++= Seq( // Spark "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion.value, "org.apache.spark" %% "spark-sql" % sparkVersion.value, - "org.apache.spark" %% "spark-core" % sparkVersion.value + "org.apache.spark" %% "spark-core" % sparkVersion.value, + "net.snowflake" %% "spark-snowflake" % f"2.15.0-spark_3.4" ) // Tests configuration diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala new file mode 100644 index 0000000..26b1f97 --- /dev/null +++ b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInput.scala @@ -0,0 +1,47 @@ +package com.amadeus.dataio.pipes.snowflake.batch + +import com.amadeus.dataio.core.{Input, Logging} +import com.amadeus.dataio.config.fields._ +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** + * Class for reading Snowflake input + * + * @param options the snowflake connector options. + * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. + */ +case class SnowflakeInput( + options: Map[String, String], + config: Config = ConfigFactory.empty() +) extends Input + with Logging { + + val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake" + + /** + * Reads a batch of data from snowflake. + * + * @param spark The SparkSession which will be used to read the data. + * @return The data that was read. + * @throws Exception If the exactly one of the dateRange/dateColumn fields is None. + */ + override def read(implicit spark: SparkSession): DataFrame = { + spark.read.format(SNOWFLAKE_CONNECTOR_NAME).options(options).load() + } + +} + +object SnowflakeInput { + + /** + * Creates a new instance of SnowflakeInput from a typesafe Config object. + * + * @param config typesafe Config object containing the configuration fields. + * @return a new SnowflakeInput object. + * @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument. + */ + def apply(implicit config: Config): SnowflakeInput = { + SnowflakeInput(options = getOptions, config = config) + } +} diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala new file mode 100644 index 0000000..b5257ec --- /dev/null +++ b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutput.scala @@ -0,0 +1,56 @@ +package com.amadeus.dataio.pipes.snowflake.batch + +import com.amadeus.dataio.config.fields._ +import com.amadeus.dataio.core.{Logging, Output} +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.sql.{Dataset, SparkSession} + +/** + * Allows to write data to Snowflake. + * + * @param mode the mode to use. + * @param options the snowflake connector options. + * @param config the config object. + */ +case class SnowflakeOutput( + mode: String, + options: Map[String, String], + config: Config = ConfigFactory.empty() +) extends Output + with Logging { + + val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake" + + /** + * Writes data to this output. + * + * @param data The data to write. + * @param spark The SparkSession which will be used to write the data. + */ + override def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = { + + data.write + .format(SNOWFLAKE_CONNECTOR_NAME) + .options(options) + .mode(mode) + .save() + } +} + +object SnowflakeOutput { + + /** + * Creates a new instance of SnowflakeOutput from a typesafe Config object. + * + * @param config typesafe Config object containing the configuration fields. + * @return a new SnowflakeOutput object. + * @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument. + */ + def apply(implicit config: Config): SnowflakeOutput = { + + val mode = config.getString("Mode") + + SnowflakeOutput(mode = mode, options = getOptions, config = config) + } + +} diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutput.scala new file mode 100644 index 0000000..52dd4e1 --- /dev/null +++ b/core/src/main/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutput.scala @@ -0,0 +1,137 @@ +package com.amadeus.dataio.pipes.snowflake.streaming + +import com.amadeus.dataio.core.{Logging, Output} +import org.apache.spark.sql.streaming.Trigger +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.sql.functions.current_timestamp +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import scala.util.Try + +/** + * Allows to write stream data to Snowflake. + * + * \!/ It uses the snowflake connector and therefore guarantees only at least once delivery. \!/ + * + * @param trigger the trigger to be used for the streaming query. + * @param timeout the streaming query timeout. + * @param mode the mode to use. + * @param options the snowflake connector options. + * @param addTimestampOnInsert if true, a timestamp column wit the current timestamp will be added to the data. + * @param config the config object. + * @param outputName the output name used to define the streaming query name. + */ +case class SnowflakeOutput( + timeout: Long, + trigger: Option[Trigger], + mode: String, + options: Map[String, String], + addTimestampOnInsert: Boolean, + config: Config = ConfigFactory.empty(), + outputName: Option[String] +) extends Output + with Logging { + + val SNOWFLAKE_CONNECTOR_NAME = "net.snowflake.spark.snowflake" + + /** + * Writes data to this output. + * + * @param data The data to write. + * @param spark The SparkSession which will be used to write the data. + */ + def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = { + + val dbTable: Option[String] = options.get("dbtable") + dbTable.foreach(table => logger.info(s"Writing dataframe to snowflake table ${table}")) + trigger.foreach(trigger => logger.info(s"Using trigger ${trigger}")) + + logger.info(s"Add timestamp on insert: $addTimestampOnInsert") + + val queryName = createQueryName() + + var streamWriter = data.writeStream.queryName(queryName) + + streamWriter = trigger match { + case Some(trigger) => streamWriter.trigger(trigger) + case _ => streamWriter + } + + streamWriter.foreachBatch((batchDF: Dataset[T], _: Long) => { + addTimestampOnInsert(batchDF).write + .format(SNOWFLAKE_CONNECTOR_NAME) + .options(options) + .mode(mode) + .save() + }) + + val streamingQuery = streamWriter.start() + + streamingQuery.awaitTermination(timeout) + streamingQuery.stop() + } + + /** + * Add current timestamp to the data if needed. + * + * @param data the dataset to enrich. + * @tparam T the dataset type. + * @return the dataset with the timestamp column if needed. + */ + private def addTimestampOnInsert[T](data: Dataset[T]): DataFrame = { + if (addTimestampOnInsert) { + data.withColumn("timestamp", current_timestamp()) + } else { + data.toDF() + } + } + + /** + * Create a unique query name based on output topic. + * + * @return a unique query name. + */ + private[streaming] def createQueryName(): String = { + + val dbTable: Option[String] = options.get("dbtable") + + (outputName, dbTable) match { + case (Some(name), Some(table)) => s"QN_${name}_${table}_${java.util.UUID.randomUUID}" + case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}" + case (None, Some(table)) => s"QN_${table}_${java.util.UUID.randomUUID}" + case _ => s"QN_${java.util.UUID.randomUUID}" + } + + } +} + +object SnowflakeOutput { + import com.amadeus.dataio.config.fields._ + + /** + * Creates a new instance of SnowflakeOutput from a typesafe Config object. + * + * @param config typesafe Config object containing the configuration fields. + * @return a new SnowflakeOutput object. + * @throws com.typesafe.config.ConfigException If any of the mandatory fields is not available in the config argument. + */ + def apply(implicit config: Config): SnowflakeOutput = { + + val mode = config.getString("Mode") + val addTimestampOnInsert = Try(config.getBoolean("AddTimestampOnInsert")).getOrElse(false) + + val duration = Try(config.getString("Duration")).toOption + val trigger = duration.map(Trigger.ProcessingTime) + + val name = Try(config.getString("Name")).toOption + + SnowflakeOutput( + timeout = getTimeout, + trigger = trigger, + mode = mode, + options = getOptions, + addTimestampOnInsert = addTimestampOnInsert, + config = config, + outputName = name + ) + } +} diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala new file mode 100644 index 0000000..f8d1a93 --- /dev/null +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeInputTest.scala @@ -0,0 +1,58 @@ +package com.amadeus.dataio.pipes.snowflake.batch + +import com.amadeus.dataio.testutils.JavaImplicitConverters._ +import com.typesafe.config.{ConfigException, ConfigFactory} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class SnowflakeInputTest extends AnyWordSpec with Matchers { + + "SnowflakeInput" should { + "be initialized according to configuration" in { + val config = ConfigFactory.parseMap( + Map( + "Input" -> Map( + "Name" -> "my-test-snowflake", + "Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput", + "Options" -> Map( + "sfDatabase" -> "TESTDATABASE", + "sfSchema" -> "TESTSCHEMA", + "sfUser" -> "TESTUSER", + "sfUrl" -> "snowflake.url.com", + "dbtable" -> "TESTTABLE" + ) + ) + ) + ) + + val snowflakeInputObj = SnowflakeInput.apply(config.getConfig("Input")) + + val expectedSnowflakeOptions = Map( + "sfDatabase" -> "TESTDATABASE", + "sfSchema" -> "TESTSCHEMA", + "sfUser" -> "TESTUSER", + "sfUrl" -> "snowflake.url.com", + "dbtable" -> "TESTTABLE" + ) + + snowflakeInputObj.options shouldEqual expectedSnowflakeOptions + } + } + + "be initialized according to configuration" in { + val config = ConfigFactory.parseMap( + Map( + "Input" -> Map( + "Name" -> "my-test-snowflake", + "Type" -> "com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput" + ) + ) + ) + + def snowflakeInputObj = SnowflakeInput.apply(config.getConfig("Input")) + + intercept[ConfigException](snowflakeInputObj) + + } + +} diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala new file mode 100644 index 0000000..a54a81c --- /dev/null +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/batch/SnowflakeOutputTest.scala @@ -0,0 +1,89 @@ +package com.amadeus.dataio.pipes.snowflake.batch + +import com.amadeus.dataio.testutils.JavaImplicitConverters._ +import com.typesafe.config.{ConfigException, ConfigFactory} +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class SnowflakeOutputTest extends AnyWordSpec with Matchers { + + "SnowflakeOutput" should { + "be initialized according to configuration" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Options" -> Map( + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", + "column_mapping" -> "name" + ) + ) + ) + ) + + val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + + val expectedSnowflakeOptions = Map( + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", + "column_mapping" -> "name" + ) + + snowflakeStreamOutput.options shouldEqual expectedSnowflakeOptions + snowflakeStreamOutput.mode shouldEqual "append" + } + + "be throw an exception when mode is missing" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Options" -> Map( + "dbTable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "warehouse", + "column_mapping" -> "name" + ) + ) + ) + ) + + def snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + intercept[ConfigException](snowflakeStreamOutput) + } + + "throw an exception when options is missing" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake" + ) + ) + ) + + def snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + intercept[ConfigException](snowflakeStreamOutput) + } + + } + +} diff --git a/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala new file mode 100644 index 0000000..7448b85 --- /dev/null +++ b/core/src/test/scala/com/amadeus/dataio/pipes/snowflake/streaming/SnowflakeOutputTest.scala @@ -0,0 +1,146 @@ +package com.amadeus.dataio.pipes.snowflake.streaming + +import com.amadeus.dataio.testutils.JavaImplicitConverters._ +import com.typesafe.config.ConfigFactory +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import org.apache.spark.sql.streaming.Trigger + +import scala.concurrent.duration.Duration + +class SnowflakeOutputTest extends AnyWordSpec with Matchers { + + "SnowflakeOutput" should { + "be initialized according to configuration" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Duration" -> "60 seconds", + "Timeout" -> "24", + "Options" -> Map( + "dbtable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester" + ) + ) + ) + ) + + val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + + val expectedSnowflakeOptions = Map( + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester", + "dbtable" -> "test-table" + ) + + snowflakeStreamOutput.outputName shouldEqual Some("my-test-snowflake") + snowflakeStreamOutput.options shouldEqual expectedSnowflakeOptions + snowflakeStreamOutput.mode shouldEqual "append" + snowflakeStreamOutput.addTimestampOnInsert shouldEqual false + snowflakeStreamOutput.trigger shouldEqual Some(Trigger.ProcessingTime(Duration("60 seconds"))) + snowflakeStreamOutput.timeout shouldEqual 24 * 60 * 60 * 1000 + } + + "be initialized according to configuration with addTimestampOnInsert to true" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Duration" -> "60 seconds", + "Timeout" -> "24", + "AddTimestampOnInsert" -> true, + "Options" -> Map( + "dbtable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester" + ) + ) + ) + ) + + val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + + val expectedSnowflakeOptions = Map( + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester", + "dbtable" -> "test-table" + ) + + snowflakeStreamOutput.outputName shouldEqual Some("my-test-snowflake") + snowflakeStreamOutput.options shouldEqual expectedSnowflakeOptions + snowflakeStreamOutput.mode shouldEqual "append" + snowflakeStreamOutput.addTimestampOnInsert shouldEqual true + snowflakeStreamOutput.trigger shouldEqual Some(Trigger.ProcessingTime(Duration("60 seconds"))) + snowflakeStreamOutput.timeout shouldEqual 24 * 60 * 60 * 1000 + } + + "be initialized according to configuration with addTimestampOnInsert to false" in { + + val config = ConfigFactory.parseMap( + Map( + "Output" -> Map( + "Type" -> "com.amadeus.dataio.output.streaming.SnowflakeOutput", + "Name" -> "my-test-snowflake", + "Mode" -> "append", + "Duration" -> "60 seconds", + "Timeout" -> "24", + "AddTimestampOnInsert" -> false, + "Options" -> Map( + "dbtable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester" + ) + ) + ) + ) + + val snowflakeStreamOutput = SnowflakeOutput.apply(config.getConfig("Output")) + + val expectedSnowflakeOptions = Map( + "dbtable" -> "test-table", + "sfUrl" -> "http://snowflake.com", + "sfUser" -> "my-user", + "sfDatabase" -> "db", + "sfSchema" -> "test-schema", + "sfWarehouse" -> "test-warehouse", + "sfRole" -> "tester" + ) + + snowflakeStreamOutput.outputName shouldEqual Some("my-test-snowflake") + snowflakeStreamOutput.options shouldEqual expectedSnowflakeOptions + snowflakeStreamOutput.mode shouldEqual "append" + snowflakeStreamOutput.addTimestampOnInsert shouldEqual false + snowflakeStreamOutput.trigger shouldEqual Some(Trigger.ProcessingTime(Duration("60 seconds"))) + snowflakeStreamOutput.timeout shouldEqual 24 * 60 * 60 * 1000 + } + + } +} diff --git a/docs/content/_data/config/pipes/snowflake/batch.yaml b/docs/content/_data/config/pipes/snowflake/batch.yaml new file mode 100644 index 0000000..c14089b --- /dev/null +++ b/docs/content/_data/config/pipes/snowflake/batch.yaml @@ -0,0 +1,12 @@ +input: + type: com.amadeus.dataio.pipes.snowflake.batch.SnowflakeInput + + +output: + type: com.amadeus.dataio.pipes.snowflake.batch.SnowflakeOutput + fields: + - name: Mode + mandatory: Yes + description: Writing mode on the Snowflake table + example: Mode = "append" + diff --git a/docs/content/_data/config/pipes/snowflake/common.yaml b/docs/content/_data/config/pipes/snowflake/common.yaml new file mode 100644 index 0000000..9fd9a8a --- /dev/null +++ b/docs/content/_data/config/pipes/snowflake/common.yaml @@ -0,0 +1,16 @@ +name: Snowflake +description: Allows the connection to Snowflake to automatically retrieve from or publish data to a Snowflake table. + +code_directory: snowflake + +links: + - name: Spark Snowflake API documentation + url: https://docs.snowflake.com/en/user-guide/spark-connector-use#using-the-connector-in-scala + +fields: + - name: Options + description: Snowflake options to specify such as key = value pairs. These options are then passed as option to the Spark connector for Snowflake + example: Options { Database = "example_database" + Schema = "example_schema" + ... + } diff --git a/docs/content/_data/config/pipes/snowflake/streaming.yaml b/docs/content/_data/config/pipes/snowflake/streaming.yaml new file mode 100644 index 0000000..3b9c47d --- /dev/null +++ b/docs/content/_data/config/pipes/snowflake/streaming.yaml @@ -0,0 +1,21 @@ +output: + type: com.amadeus.dataio.pipes.snowflake.streaming.SnowflakeOutput + fields: + - name: Duration + mandatory: "No" + description: Sets the trigger for the stream query. Controls the trigger() Spark function. + example: Duration = "60 seconds" + - name: Timeout + mandatory: "Yes" + description: Controls the amount of time before returning from the streaming query, in hours. It can be a String or an Int. + example: Timeout = 24 + - name: Mode + mandatory: "Yes" + description: The Spark Structured Streaming output mode. + example: Mode = "complete" + default: append + - name: AddTimestampOnInsert + mandatory: "No" + description: Add a column named "timestamp" containing the current timestamp at the start of query evaluation + example: AddTimestampOnInsert = "true" + default: false diff --git a/docs/content/configuration/pipes/snowflake.md b/docs/content/configuration/pipes/snowflake.md new file mode 100644 index 0000000..42c5a55 --- /dev/null +++ b/docs/content/configuration/pipes/snowflake.md @@ -0,0 +1,10 @@ +--- +title: Snowflake +layout: default +grand_parent: Configuration +parent: Pipes +--- + +# Snowflake +{% include pipe_description.md pipe=site.data.config.pipes.snowflake %} +