diff --git a/examples/java-maven/pom.xml b/examples/java-maven/pom.xml index 3281f64..bc24ac1 100644 --- a/examples/java-maven/pom.xml +++ b/examples/java-maven/pom.xml @@ -25,12 +25,15 @@ 0.0.1-SNAPSHOT - 2.12 - 3.3.3 + 2.13 + 3.5.1 1.7.36 - 0.1.0 + 0.2.1 1.8 1.8 + spark-bigtable-scala2.13 + + @@ -49,7 +52,7 @@ com.google.cloud.spark.bigtable - spark-bigtable_2.12 + ${spark.bigtable.artifact} ${spark.bigtable.version} diff --git a/examples/scala-sbt/build.sbt b/examples/scala-sbt/build.sbt index 4ea9c63..64ee45b 100644 --- a/examples/scala-sbt/build.sbt +++ b/examples/scala-sbt/build.sbt @@ -14,16 +14,26 @@ * limitations under the License. */ -name := "spark-bigtable-example" +/** build settings for scala 2.12 */ +/* +name := "spark-bigtable-example-scala2.12" +version := "0.1" +scalaVersion := "2.12.18" +val sparkBigtable = "spark-bigtable-scala2.12" +*/ +/** build settings for scala 2.13 */ +name := "spark-bigtable-example-scala2.13" version := "0.1" +scalaVersion := "2.13.14" +val sparkBigtable = "spark-bigtable-scala2.13" -scalaVersion := "2.12.19" -val sparkVersion = "3.0.1" +val sparkVersion = "3.5.1" -libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_2.12" % "0.1.0" +resolvers += Resolver.mavenLocal libraryDependencies ++= Seq( + "com.google.cloud.spark.bigtable" % sparkBigtable % "0.2.1", "org.apache.spark" %% "spark-sql" % sparkVersion % Provided, "org.slf4j" % "slf4j-reload4j" % "1.7.36", ) diff --git a/pom.xml b/pom.xml index a68ed68..bd793a8 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.google.cloud.spark.bigtable @@ -25,7 +25,6 @@ Spark Bigtable Connector Build Parent Parent project for all the Spark Bigtable Connector artifacts https://github.com/GoogleCloudDataproc/spark-bigtable-connector - Apache License, Version 2.0 @@ -52,8 +51,8 @@ - spark-bigtable_2.12 - spark-bigtable_2.12-it + spark-bigtable + spark-bigtable-core-it @@ -77,8 +76,8 @@ 2.42.0 0.175.0 - 1.8 - 1.8 + 17 + 17 3.2.16 3.0.0-M5 4.13.2 diff --git a/spark-bigtable_2.12-it/pom.xml b/spark-bigtable-core-it/pom.xml similarity index 98% rename from spark-bigtable_2.12-it/pom.xml rename to spark-bigtable-core-it/pom.xml index df3ac19..41211b4 100644 --- a/spark-bigtable_2.12-it/pom.xml +++ b/spark-bigtable-core-it/pom.xml @@ -22,11 +22,11 @@ com.google.cloud.spark.bigtable spark-bigtable-connector 0.2.1 - ../ + ../pom.xml com.google.cloud.spark.bigtable - spark-bigtable_2.12-it + spark-bigtable-core-it Google Bigtable - Spark Connector Integration Tests 0.2.1 diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/AbstractTestBase.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/AbstractTestBase.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/AbstractTestBase.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/AbstractTestBase.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/DataBoostIntegrationTest.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/DataBoostIntegrationTest.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/DataBoostIntegrationTest.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/DataBoostIntegrationTest.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/ErrorHandlingIntegrationTest.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/ErrorHandlingIntegrationTest.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/ErrorHandlingIntegrationTest.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/ErrorHandlingIntegrationTest.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/FilterPushDownFuzzTest.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/FilterPushDownFuzzTest.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/FilterPushDownFuzzTest.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/FilterPushDownFuzzTest.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/OpenLineageIntegrationTest.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/OpenLineageIntegrationTest.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/OpenLineageIntegrationTest.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/OpenLineageIntegrationTest.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/RDDReadWriteIntegrationTests.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/RDDReadWriteIntegrationTests.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/RDDReadWriteIntegrationTests.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/RDDReadWriteIntegrationTests.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/RandomDataFrameGenerator.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/RandomDataFrameGenerator.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/RandomDataFrameGenerator.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/RandomDataFrameGenerator.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/ReadWriteIntegrationTest.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/ReadWriteIntegrationTest.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/ReadWriteIntegrationTest.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/ReadWriteIntegrationTest.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/ReadWriteLongRunningTest.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/ReadWriteLongRunningTest.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/ReadWriteLongRunningTest.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/ReadWriteLongRunningTest.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/WriteFuzzTest.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/WriteFuzzTest.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/WriteFuzzTest.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/WriteFuzzTest.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/model/Favorites.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/model/Favorites.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/model/Favorites.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/model/Favorites.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/model/TestAvroRow.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/model/TestAvroRow.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/model/TestAvroRow.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/model/TestAvroRow.java diff --git a/spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/model/TestRow.java b/spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/model/TestRow.java similarity index 100% rename from spark-bigtable_2.12-it/src/test/java/com/google/cloud/spark/bigtable/model/TestRow.java rename to spark-bigtable-core-it/src/test/java/com/google/cloud/spark/bigtable/model/TestRow.java diff --git a/spark-bigtable_2.12/src/main/resources/META-INF/license/apache 2.0 - license-2.0.txt b/spark-bigtable-core/src/main/resources/META-INF/license/apache 2.0 - license-2.0.txt similarity index 100% rename from spark-bigtable_2.12/src/main/resources/META-INF/license/apache 2.0 - license-2.0.txt rename to spark-bigtable-core/src/main/resources/META-INF/license/apache 2.0 - license-2.0.txt diff --git a/spark-bigtable_2.12/src/main/resources/META-INF/license/bsd-3-clause - license.txt b/spark-bigtable-core/src/main/resources/META-INF/license/bsd-3-clause - license.txt similarity index 100% rename from spark-bigtable_2.12/src/main/resources/META-INF/license/bsd-3-clause - license.txt rename to spark-bigtable-core/src/main/resources/META-INF/license/bsd-3-clause - license.txt diff --git a/spark-bigtable_2.12/src/main/resources/META-INF/license/cddl + gplv2 with classpath exception - license.txt b/spark-bigtable-core/src/main/resources/META-INF/license/cddl + gplv2 with classpath exception - license.txt similarity index 100% rename from spark-bigtable_2.12/src/main/resources/META-INF/license/cddl + gplv2 with classpath exception - license.txt rename to spark-bigtable-core/src/main/resources/META-INF/license/cddl + gplv2 with classpath exception - license.txt diff --git a/spark-bigtable_2.12/src/main/resources/META-INF/license/go license - license.txt b/spark-bigtable-core/src/main/resources/META-INF/license/go license - license.txt similarity index 100% rename from spark-bigtable_2.12/src/main/resources/META-INF/license/go license - license.txt rename to spark-bigtable-core/src/main/resources/META-INF/license/go license - license.txt diff --git a/spark-bigtable_2.12/src/main/resources/META-INF/license/mit license - mit.txt b/spark-bigtable-core/src/main/resources/META-INF/license/mit license - mit.txt similarity index 100% rename from spark-bigtable_2.12/src/main/resources/META-INF/license/mit license - mit.txt rename to spark-bigtable-core/src/main/resources/META-INF/license/mit license - mit.txt diff --git a/spark-bigtable_2.12/src/main/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider b/spark-bigtable-core/src/main/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider similarity index 100% rename from spark-bigtable_2.12/src/main/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider rename to spark-bigtable-core/src/main/resources/META-INF/services/io.openlineage.spark.extension.OpenLineageExtensionProvider diff --git a/spark-bigtable_2.12/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark-bigtable-core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister similarity index 100% rename from spark-bigtable_2.12/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister rename to spark-bigtable-core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/BigtableDefaultSource.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/BigtableDefaultSource.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/BigtableDefaultSource.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/BigtableDefaultSource.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/BigtableRDD.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/BigtableRDD.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/BigtableRDD.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/BigtableRDD.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/SparkBigtableLineageProvider.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/SparkBigtableLineageProvider.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/SparkBigtableLineageProvider.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/SparkBigtableLineageProvider.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableDataClientBuilder.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableDataClientBuilder.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableDataClientBuilder.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableDataClientBuilder.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableSparkConf.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableSparkConf.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableSparkConf.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableSparkConf.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableTableScanRDD.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableTableScanRDD.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableTableScanRDD.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableTableScanRDD.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/ByteConversions.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/datasources/ByteConversions.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/ByteConversions.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/datasources/ByteConversions.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/EqualToFilterAdapter.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/EqualToFilterAdapter.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/EqualToFilterAdapter.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/EqualToFilterAdapter.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/GreaterThanFilterAdapter.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/GreaterThanFilterAdapter.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/GreaterThanFilterAdapter.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/GreaterThanFilterAdapter.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/GreaterThanOrEqualFilterAdapter.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/GreaterThanOrEqualFilterAdapter.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/GreaterThanOrEqualFilterAdapter.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/GreaterThanOrEqualFilterAdapter.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/LessThanFilterAdapter.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/LessThanFilterAdapter.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/LessThanFilterAdapter.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/LessThanFilterAdapter.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/LessThanOrEqualFilterAdapter.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/LessThanOrEqualFilterAdapter.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/LessThanOrEqualFilterAdapter.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/LessThanOrEqualFilterAdapter.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/RowKeyWrapper.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/RowKeyWrapper.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/RowKeyWrapper.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/RowKeyWrapper.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/SparkSqlFilterAdapter.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/SparkSqlFilterAdapter.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/SparkSqlFilterAdapter.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/SparkSqlFilterAdapter.scala diff --git a/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/StringStartsWithFilterAdapter.scala b/spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/StringStartsWithFilterAdapter.scala similarity index 100% rename from spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/filters/StringStartsWithFilterAdapter.scala rename to spark-bigtable-core/src/main/scala/com/google/cloud/spark/bigtable/filters/StringStartsWithFilterAdapter.scala diff --git a/spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeCustomDataService.java b/spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeCustomDataService.java similarity index 100% rename from spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeCustomDataService.java rename to spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeCustomDataService.java diff --git a/spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeGenericDataService.java b/spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeGenericDataService.java similarity index 100% rename from spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeGenericDataService.java rename to spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeGenericDataService.java diff --git a/spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeServerBuilder.java b/spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeServerBuilder.java similarity index 100% rename from spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeServerBuilder.java rename to spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeServerBuilder.java diff --git a/spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeServerCommon.java b/spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeServerCommon.java similarity index 100% rename from spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeServerCommon.java rename to spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeServerCommon.java diff --git a/spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeTableAdminService.java b/spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeTableAdminService.java similarity index 100% rename from spark-bigtable_2.12/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeTableAdminService.java rename to spark-bigtable-core/src/test/java/com/google/cloud/spark/bigtable/fakeserver/FakeTableAdminService.java diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/BigtableTableScanRDDTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/BigtableTableScanRDDTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/BigtableTableScanRDDTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/BigtableTableScanRDDTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/ByteConversionsTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/ByteConversionsTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/ByteConversionsTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/ByteConversionsTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/ReadRowConversionsTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/ReadRowConversionsTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/ReadRowConversionsTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/ReadRowConversionsTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/WriteRowConversionsTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/WriteRowConversionsTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/WriteRowConversionsTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/WriteRowConversionsTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/EqualToFilterAdapterTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/EqualToFilterAdapterTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/EqualToFilterAdapterTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/EqualToFilterAdapterTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/GreaterThanFilterAdapterTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/GreaterThanFilterAdapterTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/GreaterThanFilterAdapterTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/GreaterThanFilterAdapterTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/GreaterThanOrEqualFilterAdapterTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/GreaterThanOrEqualFilterAdapterTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/GreaterThanOrEqualFilterAdapterTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/GreaterThanOrEqualFilterAdapterTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/LessThanFilterAdapterTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/LessThanFilterAdapterTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/LessThanFilterAdapterTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/LessThanFilterAdapterTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/LessThanOrEqualFilterAdapterTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/LessThanOrEqualFilterAdapterTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/LessThanOrEqualFilterAdapterTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/LessThanOrEqualFilterAdapterTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/SparkSqlFilterAdapterTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/SparkSqlFilterAdapterTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/SparkSqlFilterAdapterTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/SparkSqlFilterAdapterTest.scala diff --git a/spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/StringStartsWithFilterAdapterTest.scala b/spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/StringStartsWithFilterAdapterTest.scala similarity index 100% rename from spark-bigtable_2.12/src/test/scala/com/google/cloud/spark/bigtable/filters/StringStartsWithFilterAdapterTest.scala rename to spark-bigtable-core/src/test/scala/com/google/cloud/spark/bigtable/filters/StringStartsWithFilterAdapterTest.scala diff --git a/spark-bigtable_2.12/test-pyspark/load_test.py b/spark-bigtable-core/test-pyspark/load_test.py similarity index 100% rename from spark-bigtable_2.12/test-pyspark/load_test.py rename to spark-bigtable-core/test-pyspark/load_test.py diff --git a/spark-bigtable_2.12/test-pyspark/read_and_write_test.py b/spark-bigtable-core/test-pyspark/read_and_write_test.py similarity index 100% rename from spark-bigtable_2.12/test-pyspark/read_and_write_test.py rename to spark-bigtable-core/test-pyspark/read_and_write_test.py diff --git a/spark-bigtable_2.12/test-pyspark/test_base.py b/spark-bigtable-core/test-pyspark/test_base.py similarity index 100% rename from spark-bigtable_2.12/test-pyspark/test_base.py rename to spark-bigtable-core/test-pyspark/test_base.py diff --git a/spark-bigtable_2.12/pom.xml b/spark-bigtable/pom.xml similarity index 62% rename from spark-bigtable_2.12/pom.xml rename to spark-bigtable/pom.xml index e4eff93..5b300e5 100644 --- a/spark-bigtable_2.12/pom.xml +++ b/spark-bigtable/pom.xml @@ -14,21 +14,24 @@ See the License for the specific language governing permissions and limitations under the License. --> - 4.0.0 - com.google.cloud.spark.bigtable spark-bigtable-connector - 0.2.1 - ../ + 0.2.1 - com.google.cloud.spark.bigtable - spark-bigtable_2.12 - Google Bigtable - Apache Spark Connector - 0.2.1 + spark-bigtable + 0.2.1 + pom + + + spark-bigtable-scala2.12 + spark-bigtable-scala2.13 + @@ -41,72 +44,11 @@ grpc-google-cloud-bigtable-admin-v2 ${bigtable.java.version} - - - - org.scala-lang.modules - scala-parser-combinators_${scala.binary.version} - ${scala-parser-combinators.version} - - - - org.scala-lang - scala-library - ${scala.version} - provided - - - - org.apache.spark - spark-core_${scala.binary.version} - ${spark.version} - provided - - - org.scala-lang - scala-library - - - org.scala-lang - scalap - - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} - provided - - org.slf4j slf4j-reload4j ${slf4j-reload4j.version} - - - org.scalatest - scalatest_${scala.binary.version} - ${scalatest.version} - test - - - org.scala-lang - scala-library - - - - io.openlineage spark-extension-interfaces @@ -129,6 +71,11 @@ + + + ../../spark-bigtable-core/src/main/resources + + net.alchim31.maven @@ -148,46 +95,6 @@ maven-compiler-plugin 3.10.1 - - org.codehaus.mojo - build-helper-maven-plugin - 3.3.0 - - - add-source - generate-sources - - add-source - - - - src/main/scala - - src/test/java - - ../third_party/hbase-spark-connector/hbase-connectors/src/main/scala/ - - ${project.build.directory}/sources - - - - - add-test-source - generate-sources - - add-test-source - - - - src/test/scala - ../third_party/hbase-spark-connector/hbase-connectors/src/test/scala/ - - - - - org.apache.maven.plugins maven-shade-plugin @@ -201,16 +108,16 @@ + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> false + implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"> + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> ${app.main.class} ${maven.compile.source} @@ -323,4 +230,4 @@ - + \ No newline at end of file diff --git a/spark-bigtable/spark-bigtable-scala2.12/pom.xml b/spark-bigtable/spark-bigtable-scala2.12/pom.xml new file mode 100644 index 0000000..b7a0738 --- /dev/null +++ b/spark-bigtable/spark-bigtable-scala2.12/pom.xml @@ -0,0 +1,145 @@ + + + + 4.0.0 + + com.google.cloud.spark.bigtable + spark-bigtable + 0.2.1 + + + spark-bigtable-scala2.12 + 0.2.1 + + + 2.12.18 + 2.12 + 3.5.1 + + + + + org.scala-lang.modules + scala-parser-combinators_${scala.binary.version} + ${scala-parser-combinators.version} + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.scala-lang + scala-library + + + org.scala-lang + scalap + + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.scalatest + scalatest_${scala.binary.version} + ${scalatest.version} + test + + + org.scala-lang + scala-library + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.3.0 + + + add-source + generate-sources + + add-source + + + + ../../spark-bigtable-core/src/main/scala + ../../spark-bigtable-core/src/test/java + + + ../../third_party/hbase-spark-connector/hbase-connectors/src/main/scala/ + + + ../../third_party/hbase-spark-connector/hbase-connectors-scala2.12/src/main/scala/ + + + ${project.build.directory}/sources + + + + + add-test-source + generate-sources + + add-test-source + + + + src/test/scala + ../third_party/hbase-spark-connector/hbase-connectors/src/test/scala/ + + + + + + + + \ No newline at end of file diff --git a/spark-bigtable/spark-bigtable-scala2.13/pom.xml b/spark-bigtable/spark-bigtable-scala2.13/pom.xml new file mode 100644 index 0000000..cea6693 --- /dev/null +++ b/spark-bigtable/spark-bigtable-scala2.13/pom.xml @@ -0,0 +1,130 @@ + + + 4.0.0 + + com.google.cloud.spark.bigtable + spark-bigtable + 0.2.1 + + + spark-bigtable-scala2.13 + 0.2.1 + + + 2.13.14 + 2.13 + 3.5.1 + + + + + org.scala-lang.modules + scala-parser-combinators_${scala.binary.version} + ${scala-parser-combinators.version} + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.scala-lang + scala-library + + + org.scala-lang + scalap + + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.scalatest + scalatest_${scala.binary.version} + ${scalatest.version} + test + + + org.scala-lang + scala-library + + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.3.0 + + + add-source + generate-sources + + add-source + + + + ../../spark-bigtable-core/src/main/scala + + ../../spark-bigtable-core/src/test/java + + ../../third_party/hbase-spark-connector/hbase-connectors/src/main/scala/ + + + ../../third_party/hbase-spark-connector/hbase-connectors-scala2.13/src/main/scala/ + + + ${project.build.directory}/sources + + + + + add-test-source + generate-sources + + add-test-source + + + + src/test/scala + ../third_party/hbase-spark-connector/hbase-connectors/src/test/scala/ + + + + + + + + \ No newline at end of file diff --git a/third_party/hbase-spark-connector/hbase-connectors/src/main/scala/com/google/cloud/spark/bigtable/datasources/SchemaConverters.scala b/third_party/hbase-spark-connector/hbase-connectors-scala2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/SchemaConverters.scala similarity index 100% rename from third_party/hbase-spark-connector/hbase-connectors/src/main/scala/com/google/cloud/spark/bigtable/datasources/SchemaConverters.scala rename to third_party/hbase-spark-connector/hbase-connectors-scala2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/SchemaConverters.scala diff --git a/third_party/hbase-spark-connector/hbase-connectors-scala2.13/src/main/scala/com/google/cloud/spark/bigtable/datasources/SchemaConverters.scala b/third_party/hbase-spark-connector/hbase-connectors-scala2.13/src/main/scala/com/google/cloud/spark/bigtable/datasources/SchemaConverters.scala new file mode 100644 index 0000000..797bcc1 --- /dev/null +++ b/third_party/hbase-spark-connector/hbase-connectors-scala2.13/src/main/scala/com/google/cloud/spark/bigtable/datasources/SchemaConverters.scala @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spark.bigtable.datasources + +import org.apache.avro.Schema.Type._ +import org.apache.avro.SchemaBuilder.{BaseFieldTypeBuilder, BaseTypeBuilder, FieldAssembler, FieldDefault, RecordBuilder} +import org.apache.avro.generic.GenericData.{Fixed, Record} +import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.commons.io.output.ByteArrayOutputStream +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import org.apache.yetus.audience.InterfaceAudience + +import java.io.ByteArrayInputStream +import java.nio.ByteBuffer +import java.sql.Timestamp +import java.util +import java.util.HashMap +import scala.jdk.CollectionConverters._ + +@InterfaceAudience.Private +abstract class AvroException(msg: String) extends Exception(msg) + +@InterfaceAudience.Private +case class SchemaConversionException(msg: String) extends AvroException(msg) + +/** * + * On top level, the converters provide three high level interface. + * 1. toSqlType: This function takes an avro schema and returns a sql schema. + * 2. createConverterToSQL: Returns a function that is used to convert avro types to their + * corresponding sparkSQL representations. + * 3. convertTypeToAvro: This function constructs converter function for a given sparkSQL + * datatype. This is used in writing Avro records out to disk + */ +@InterfaceAudience.Private +object SchemaConverters { + + case class SchemaType(dataType: DataType, nullable: Boolean) + + /** This function takes an avro schema and returns a sql schema. + */ + def toSqlType(avroSchema: Schema): SchemaType = { + avroSchema.getType match { + case INT => SchemaType(IntegerType, nullable = false) + case STRING => SchemaType(StringType, nullable = false) + case BOOLEAN => SchemaType(BooleanType, nullable = false) + case BYTES => SchemaType(BinaryType, nullable = false) + case DOUBLE => SchemaType(DoubleType, nullable = false) + case FLOAT => SchemaType(FloatType, nullable = false) + case LONG => SchemaType(LongType, nullable = false) + case FIXED => SchemaType(BinaryType, nullable = false) + case ENUM => SchemaType(StringType, nullable = false) + + case RECORD => + val fields = avroSchema.getFields.asScala.toSeq.map { f => + val schemaType = toSqlType(f.schema()) + StructField(f.name, schemaType.dataType, schemaType.nullable) + } + + SchemaType(StructType(fields), nullable = false) + + case ARRAY => + val schemaType = toSqlType(avroSchema.getElementType) + SchemaType( + ArrayType(schemaType.dataType, containsNull = schemaType.nullable), + nullable = false + ) + + case MAP => + val schemaType = toSqlType(avroSchema.getValueType) + SchemaType( + MapType( + StringType, + schemaType.dataType, + valueContainsNull = schemaType.nullable + ), + nullable = false + ) + + case UNION => + if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { + // In case of a union with null, eliminate it and make a recursive call + val remainingUnionTypes = + avroSchema.getTypes.asScala.filterNot(_.getType == NULL) + if (remainingUnionTypes.size == 1) { + toSqlType(remainingUnionTypes.head).copy(nullable = true) + } else { + toSqlType(Schema.createUnion(remainingUnionTypes.asJava)) + .copy(nullable = true) + } + } else + avroSchema.getTypes.asScala.map(_.getType) match { + case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => + SchemaType(LongType, nullable = false) + case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => + SchemaType(DoubleType, nullable = false) + case other => + throw new SchemaConversionException( + s"This mix of union types is not supported: $other" + ) + } + + case other => + throw new SchemaConversionException(s"Unsupported type $other") + } + } + + /** This function converts sparkSQL StructType into avro schema. This method uses two other + * converter methods in order to do the conversion. + */ + private def convertStructToAvro[T]( + structType: StructType, + schemaBuilder: RecordBuilder[T], + recordNamespace: String + ): T = { + val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields() + structType.fields.foreach { field => + val newField = fieldsAssembler.name(field.name).`type`() + + if (field.nullable) { + convertFieldTypeToAvro( + field.dataType, + newField.nullable(), + field.name, + recordNamespace + ).noDefault + } else { + convertFieldTypeToAvro( + field.dataType, + newField, + field.name, + recordNamespace + ).noDefault + } + } + fieldsAssembler.endRecord() + } + + /** Returns a function that is used to convert avro types to their + * corresponding sparkSQL representations. + */ + def createConverterToSQL(schema: Schema): Any => Any = { + schema.getType match { + // Avro strings are in Utf8, so we have to call toString on them + case STRING | ENUM => + (item: Any) => if (item == null) null else item.toString + case INT | BOOLEAN | DOUBLE | FLOAT | LONG => identity + // Byte arrays are reused by avro, so we have to make a copy of them. + case FIXED => + (item: Any) => + if (item == null) { + null + } else { + item.asInstanceOf[Fixed].bytes().clone() + } + case BYTES => + (item: Any) => + if (item == null) { + null + } else { + val bytes = item.asInstanceOf[ByteBuffer] + val javaBytes = new Array[Byte](bytes.remaining) + bytes.get(javaBytes) + javaBytes + } + case RECORD => + val fieldConverters = + schema.getFields.asScala.map(f => createConverterToSQL(f.schema)) + (item: Any) => + if (item == null) { + null + } else { + val record = item.asInstanceOf[GenericRecord] + val converted = new Array[Any](fieldConverters.size) + var idx = 0 + while (idx < fieldConverters.size) { + converted(idx) = fieldConverters.apply(idx)(record.get(idx)) + idx += 1 + } + Row.fromSeq(converted.toSeq) + } + case ARRAY => + val elementConverter = createConverterToSQL(schema.getElementType) + (item: Any) => + if (item == null) { + null + } else { + try { + item.asInstanceOf[GenericData.Array[Any]].asScala.map(elementConverter) + } catch { + case e: Throwable => + item.asInstanceOf[util.ArrayList[Any]].asScala.map(elementConverter) + } + } + case MAP => + val valueConverter = createConverterToSQL(schema.getValueType) + (item: Any) => + if (item == null) { + null + } else { + item + .asInstanceOf[util.HashMap[Any, Any]] + .asScala + .map(x => (x._1.toString, valueConverter(x._2))) + .toMap + } + case UNION => + if (schema.getTypes.asScala.exists(_.getType == NULL)) { + val remainingUnionTypes = schema.getTypes.asScala.filterNot(_.getType == NULL) + if (remainingUnionTypes.size == 1) { + createConverterToSQL(remainingUnionTypes.head) + } else { + createConverterToSQL(Schema.createUnion(remainingUnionTypes.asJava)) + } + } else + schema.getTypes.asScala.map(_.getType) match { + case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => + (item: Any) => { + item match { + case l: Long => l + case i: Int => i.toLong + case null => null + } + } + case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => + (item: Any) => { + item match { + case d: Double => d + case f: Float => f.toDouble + case null => null + } + } + case other => + throw new SchemaConversionException( + s"This mix of union types is not supported (see README): $other" + ) + } + case other => + throw new SchemaConversionException(s"invalid avro type: $other") + } + } + + /** This function is used to convert some sparkSQL type to avro type. Note that this function won't + * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). + */ + private def convertTypeToAvro[T]( + dataType: DataType, + schemaBuilder: BaseTypeBuilder[T], + structName: String, + recordNamespace: String + ): T = { + dataType match { + case ByteType => schemaBuilder.intType() + case ShortType => schemaBuilder.intType() + case IntegerType => schemaBuilder.intType() + case LongType => schemaBuilder.longType() + case FloatType => schemaBuilder.floatType() + case DoubleType => schemaBuilder.doubleType() + case _: DecimalType => schemaBuilder.stringType() + case StringType => schemaBuilder.stringType() + case BinaryType => schemaBuilder.bytesType() + case BooleanType => schemaBuilder.booleanType() + case TimestampType => schemaBuilder.longType() + + case ArrayType(elementType, _) => + val builder = getSchemaBuilder( + dataType.asInstanceOf[ArrayType].containsNull + ) + val elementSchema = + convertTypeToAvro(elementType, builder, structName, recordNamespace) + schemaBuilder.array().items(elementSchema) + + case MapType(StringType, valueType, _) => + val builder = getSchemaBuilder( + dataType.asInstanceOf[MapType].valueContainsNull + ) + val valueSchema = + convertTypeToAvro(valueType, builder, structName, recordNamespace) + schemaBuilder.map().values(valueSchema) + + case structType: StructType => + convertStructToAvro( + structType, + schemaBuilder.record(structName).namespace(recordNamespace), + recordNamespace + ) + + case other => + throw new IllegalArgumentException(s"Unexpected type $dataType.") + } + } + + /** This function is used to construct fields of the avro record, where schema of the field is + * specified by avro representation of dataType. Since builders for record fields are different + * from those for everything else, we have to use a separate method. + */ + private def convertFieldTypeToAvro[T]( + dataType: DataType, + newFieldBuilder: BaseFieldTypeBuilder[T], + structName: String, + recordNamespace: String + ): FieldDefault[T, _] = { + dataType match { + case ByteType => newFieldBuilder.intType() + case ShortType => newFieldBuilder.intType() + case IntegerType => newFieldBuilder.intType() + case LongType => newFieldBuilder.longType() + case FloatType => newFieldBuilder.floatType() + case DoubleType => newFieldBuilder.doubleType() + case _: DecimalType => newFieldBuilder.stringType() + case StringType => newFieldBuilder.stringType() + case BinaryType => newFieldBuilder.bytesType() + case BooleanType => newFieldBuilder.booleanType() + case TimestampType => newFieldBuilder.longType() + + case ArrayType(elementType, _) => + val builder = getSchemaBuilder( + dataType.asInstanceOf[ArrayType].containsNull + ) + val elementSchema = + convertTypeToAvro(elementType, builder, structName, recordNamespace) + newFieldBuilder.array().items(elementSchema) + + case MapType(StringType, valueType, _) => + val builder = getSchemaBuilder( + dataType.asInstanceOf[MapType].valueContainsNull + ) + val valueSchema = + convertTypeToAvro(valueType, builder, structName, recordNamespace) + newFieldBuilder.map().values(valueSchema) + + case structType: StructType => + convertStructToAvro( + structType, + newFieldBuilder.record(structName).namespace(recordNamespace), + recordNamespace + ) + + case other => + throw new IllegalArgumentException(s"Unexpected type $dataType.") + } + } + + private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = { + if (isNullable) { + SchemaBuilder.builder().nullable() + } else { + SchemaBuilder.builder() + } + } + + /** This function constructs converter function for a given sparkSQL datatype. This is used in + * writing Avro records out to disk + */ + def createConverterToAvro( + dataType: DataType, + structName: String, + recordNamespace: String + ): (Any) => Any = { + dataType match { + case BinaryType => + (item: Any) => + item match { + case null => null + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + } + case ByteType | ShortType | IntegerType | LongType | FloatType | + DoubleType | StringType | BooleanType => + identity + case _: DecimalType => + (item: Any) => if (item == null) null else item.toString + case TimestampType => + (item: Any) => + if (item == null) null else item.asInstanceOf[Timestamp].getTime + case ArrayType(elementType, _) => + val elementConverter = + createConverterToAvro(elementType, structName, recordNamespace) + (item: Any) => { + if (item == null) { + null + } else { + val sourceArray = item.asInstanceOf[Seq[Any]] + val sourceArraySize = sourceArray.size + val targetArray = new util.ArrayList[Any](sourceArraySize) + var idx = 0 + while (idx < sourceArraySize) { + targetArray.add(elementConverter(sourceArray(idx))) + idx += 1 + } + targetArray + } + } + case MapType(StringType, valueType, _) => + val valueConverter = + createConverterToAvro(valueType, structName, recordNamespace) + (item: Any) => { + if (item == null) { + null + } else { + val javaMap = new util.HashMap[String, Any]() + item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => + javaMap.put(key, valueConverter(value)) + } + javaMap + } + } + case structType: StructType => + val builder = + SchemaBuilder.record(structName).namespace(recordNamespace) + val schema: Schema = SchemaConverters.convertStructToAvro( + structType, + builder, + recordNamespace + ) + val fieldConverters = structType.fields.map(field => + createConverterToAvro(field.dataType, field.name, recordNamespace) + ) + (item: Any) => { + if (item == null) { + null + } else { + val record = new Record(schema) + val convertersIterator = fieldConverters.iterator + val fieldNamesIterator = + dataType.asInstanceOf[StructType].fieldNames.iterator + val rowIterator = item.asInstanceOf[Row].toSeq.iterator + + while (convertersIterator.hasNext) { + val converter = convertersIterator.next() + record.put( + fieldNamesIterator.next(), + converter(rowIterator.next()) + ) + } + record + } + } + } + } +} + +@InterfaceAudience.Private +object AvroSerdes { + // We only handle top level is record or primary type now + def serialize(input: Any, schema: Schema): Array[Byte] = { + schema.getType match { + case BOOLEAN => BytesConverter.toBytes(input.asInstanceOf[Boolean]) + case BYTES | FIXED => input.asInstanceOf[Array[Byte]] + case DOUBLE => BytesConverter.toBytes(input.asInstanceOf[Double]) + case FLOAT => BytesConverter.toBytes(input.asInstanceOf[Float]) + case INT => BytesConverter.toBytes(input.asInstanceOf[Int]) + case LONG => BytesConverter.toBytes(input.asInstanceOf[Long]) + case STRING => BytesConverter.toBytes(input.asInstanceOf[String]) + case RECORD => + val gr = input.asInstanceOf[GenericRecord] + val writer2 = new GenericDatumWriter[GenericRecord](schema) + val bao2 = new ByteArrayOutputStream() + val encoder2: BinaryEncoder = + EncoderFactory.get().directBinaryEncoder(bao2, null) + writer2.write(gr, encoder2) + bao2.toByteArray + case _ => throw new Exception(s"unsupported data type ${schema.getType}") + } + } + + def deserialize(input: Array[Byte], schema: Schema): GenericRecord = { + val reader2: DatumReader[GenericRecord] = + new GenericDatumReader[GenericRecord](schema) + val bai2 = new ByteArrayInputStream(input) + val decoder2: BinaryDecoder = + DecoderFactory.get().directBinaryDecoder(bai2, null) + val gr2: GenericRecord = reader2.read(null, decoder2) + gr2 + } +} \ No newline at end of file