The connector supports reading Google Cloud Spanner tables into Spark's DataFrames. This is done by using the Spark SQL Data Source API to communicate with Spanner Java library.
Follow the instructions to create a project or Spanner table if you don't have an existing one.
If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use spark-submit
on any cluster.
Any Dataproc cluster using the API needs the 'Spanner' or 'cloud-platform' scopes. Dataproc clusters don't have the 'spanner' scope by default, but you can create a cluster with the scope. For example:
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER" --scopes https://www.googleapis.com/auth/cloud-platform
If you run a Spark job on the Dataproc cluster, you'll have to assign corresponding Spanner permission to the Dataproc VM service account. If you choose to use Dataproc Serverless, you'll have to make sure the Serverless service account has the permission.
You can find the released jar file from the Releases tag on right of the github page. The name pattern is spark-3.1-spanner-x.x.x.jar. The 3.1 indicates the driver depends on the Spark 3.1 and x.x.x is the Spark Spanner connector version. The alternative way is to use gs://spark-lib/spanner/spark-3.1-spanner-1.1.0.jar
directly.
Connector \ Spark | 2.3 | 2.4 (Scala 2.11) |
2.4 (Scala 2.12) |
3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 |
---|---|---|---|---|---|---|---|---|---|
spark-3.1-spanner | ✓ | ✓ | ✓ | ✓ | ✓ |
Connector \ Dataproc Image | 1.3 | 1.4 | 1.5 | 2.0 | 2.1 | 2.2 | Serverless Image 1.0 |
Serverless Image 2.0 |
Serverless Image 2.1 |
---|---|---|---|---|---|---|---|---|---|
spark-3.1-spanner | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
The connector is not available on the Maven Central yet.
You can use the standard --jars
or --packages
(or alternatively, the spark.jars
/spark.jars.packages
configuration) to specify the Spark Spanner connector. For example:
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \
--jars=gs://spark-lib/spanner/spark-3.1-spanner-1.1.0.jar \
--region us-central1 examples/SpannerSpark.py
The connector uses the cross language Spark SQL Data Source API:
This is an example of using Python code to connect to a Spanner table. You can find more examples or documentations on the usage.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spanner Connect App').getOrCreate()
df = spark.read.format('cloud-spanner') \
.option("projectId", "$YourProjectId") \
.option("instanceId", "$YourInstanceId") \
.option("databaseId", "$YourDatabaseId") \
.option("table", "$YourTable") \
.load()
df.show()
For the other languages support, you can refer to Scala, Java, and R. You can also refer Scala, Java, R about how to submit a job for other languages.
Here are the options supported in the Spark Spanner connector.
Variable | Validation | Comments |
---|---|---|
projectId | String | The projectID containing the Cloud Spanner database |
instanceId | String | The instanceID of the Cloud Spanner database |
databaseId | String | The databaseID of the Cloud Spanner database |
table | String | The Table of the Cloud Spanner database that you are reading from |
enableDataboost | Boolean | Enable the Data Boost, which provides independent compute resources to query Spanner with near-zero impact to existing workloads. Note the option may trigger extra charge. |
Here are the mappings for supported Spanner data types.
Spanner GoogleSql Type | Spark Data Type | Notes |
---|---|---|
ARRAY | ArrayType | Nested ARRAY is not supported, e.g. ARRAY<ARRAY>. |
BOOL | BooleanType | |
BYTES | BinaryType | |
DATE | DateType | The date range is [1700-01-01, 9999-12-31]. |
FLOAT64 | DoubleType | |
INT64 | LongType | The supported integer range is [-9,223,372,036,854,775,808, 9,223,372,036,854,775,807] |
JSON | StringType | Spark has no JSON type. The values are read as String. |
NUMERIC | DecimalType | The NUMERIC will be converted to DecimalType with 38 precision and 9 scale, which is the same as the Spanner definition. |
STRING | StringType | |
TIMESTAMP | TimestampType | Only microseconds will be converted to Spark timestamp type. The range of timestamp is [0001-01-01 00:00:00, 9999-12-31 23:59:59.999999] |
The connector automatically computes column and pushdown filters the DataFrame's SELECT
statement e.g.
df.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
filters to the column word
and pushed down the predicate filter word = 'hamlet' or word = 'Claudius'
. Note filters containing ArrayType column is not pushed down.
When Data Boost is enabled, the usage can be monitored by using Cloud Monitoring. The page explains how to do that step by step. The usage cannot be grouped by the Spark job id though.
Dataproc web interface can be used to debug especially to tune the performance. On the YARN Application Timeline
page, it displays the execution timeline details for the executors and other functions. You can assign more workers if there are many tasks assigned to a same executor.
When DataBoost is enabled, all queries that are fed into Cloud Spanner must be root-partionable. Please see Read data in parallel
for more details. If you encounter an issue related to partitioning when using this connector, it is probably that the table being read from is not supported.
The connector supports the Spanner PostgreSQL interface-enabled databases.
Spanner PostgreSql Type | Spark Data Type | Notes |
---|---|---|
array | ArrayType | Nested array is not supported. |
bool / boolean | BooleanType | |
bytea | BinaryType | |
date | DateType | The date range is [1700-01-01, 9999-12-31]. |
double precision / float8 | DoubleType | |
int8 / bigint | LongType | The supported integer range is [-9,223,372,036,854,775,808, 9,223,372,036,854,775,807] |
jsonb | StringType | Spark has no JSON type. The values are read as String. |
numeric / decimal | DecimalType | The NUMERIC will be converted to DecimalType with 38 precision and 9 scale, which is the same as the Spanner definition. |
varchar / text / character varying | StringType | |
timestamptz/timestamp with time zone | TimestampType | Only microseconds will be converted to Spark timestamp type. The range of timestamp is [0001-01-01 00:00:00, 9999-12-31 23:59:59.999999] |
Since jsonb is converted to StringType in Spark, a filter containing jsonb column can only be pushed down as a string filter. For the jsonb column, IN
filter is not pushdown to Cloud Spanner.
Filters containing array column will not be pushed down.