Skip to content

Commit

Permalink
Merge pull request apache#33044 [yaml] SpannerIO docs and minor impro…
Browse files Browse the repository at this point in the history
…vments
  • Loading branch information
robertwb authored Nov 8, 2024
2 parents bd2b0e6 + 5ebfd82 commit 2488ca1
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@
* grouped into batches. The default maximum size of the batch is set to 1MB or 5000 mutated cells,
* or 500 rows (whichever is reached first). To override this use {@link
* Write#withBatchSizeBytes(long) withBatchSizeBytes()}, {@link Write#withMaxNumMutations(long)
* withMaxNumMutations()} or {@link Write#withMaxNumMutations(long) withMaxNumRows()}. Setting
* either to a small value or zero disables batching.
* withMaxNumMutations()} or {@link Write#withMaxNumRows(long) withMaxNumRows()}. Setting either to
* a small value or zero disables batching.
*
* <p>Note that the <a
* href="https://cloud.google.com/spanner/quotas#limits_for_creating_reading_updating_and_deleting_data">maximum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;

/** A provider for reading from Cloud Spanner using a Schema Transform Provider. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
Expand All @@ -54,43 +55,81 @@
*
* <p>The transformation leverages the {@link SpannerIO} to perform the read operation and maps the
* results to Beam rows, preserving the schema.
*
* <p>Example usage in a YAML pipeline using query:
*
* <pre>{@code
* pipeline:
* transforms:
* - type: ReadFromSpanner
* name: ReadShipments
* # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
* config:
* project_id: 'apache-beam-testing'
* instance_id: 'shipment-test'
* database_id: 'shipment'
* query: 'SELECT * FROM shipments'
* }</pre>
*
* <p>Example usage in a YAML pipeline using a table and columns:
*
* <pre>{@code
* pipeline:
* transforms:
* - type: ReadFromSpanner
* name: ReadShipments
* # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
* config:
* project_id: 'apache-beam-testing'
* instance_id: 'shipment-test'
* database_id: 'shipment'
* table: 'shipments'
* columns: ['customer_id', 'customer_name']
* }</pre>
*/
@AutoService(SchemaTransformProvider.class)
public class SpannerReadSchemaTransformProvider
extends TypedSchemaTransformProvider<
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_read:v1";
}

@Override
public String description() {
return "Performs a Bulk read from Google Cloud Spanner using a specified SQL query or "
+ "by directly accessing a single table and its columns.\n"
+ "\n"
+ "Both Query and Read APIs are supported. See more information about "
+ "<a href=\"https://cloud.google.com/spanner/docs/reads\">reading from Cloud Spanner</a>.\n"
+ "\n"
+ "Example configuration for performing a read using a SQL query: ::\n"
+ "\n"
+ " pipeline:\n"
+ " transforms:\n"
+ " - type: ReadFromSpanner\n"
+ " config:\n"
+ " instance_id: 'my-instance-id'\n"
+ " database_id: 'my-database'\n"
+ " query: 'SELECT * FROM table'\n"
+ "\n"
+ "It is also possible to read a table by specifying a table name and a list of columns. For "
+ "example, the following configuration will perform a read on an entire table: ::\n"
+ "\n"
+ " pipeline:\n"
+ " transforms:\n"
+ " - type: ReadFromSpanner\n"
+ " config:\n"
+ " instance_id: 'my-instance-id'\n"
+ " database_id: 'my-database'\n"
+ " table: 'my-table'\n"
+ " columns: ['col1', 'col2']\n"
+ "\n"
+ "Additionally, to read using a <a href=\"https://cloud.google.com/spanner/docs/secondary-indexes\">"
+ "Secondary Index</a>, specify the index name: ::"
+ "\n"
+ " pipeline:\n"
+ " transforms:\n"
+ " - type: ReadFromSpanner\n"
+ " config:\n"
+ " instance_id: 'my-instance-id'\n"
+ " database_id: 'my-database'\n"
+ " table: 'my-table'\n"
+ " index: 'my-index'\n"
+ " columns: ['col1', 'col2']\n"
+ "\n"
+ "### Advanced Usage\n"
+ "\n"
+ "Reads by default use the <a href=\"https://cloud.google.com/spanner/docs/reads#read_data_in_parallel\">"
+ "PartitionQuery API</a> which enforces some limitations on the type of queries that can be used so that "
+ "the data can be read in parallel. If the query is not supported by the PartitionQuery API, then you "
+ "can specify a non-partitioned read by setting batching to false.\n"
+ "\n"
+ "For example: ::"
+ "\n"
+ " pipeline:\n"
+ " transforms:\n"
+ " - type: ReadFromSpanner\n"
+ " config:\n"
+ " batching: false\n"
+ " ...\n"
+ "\n"
+ "Note: See <a href=\""
+ "https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.html\">"
+ "SpannerIO</a> for more advanced information.";
}

static class SpannerSchemaTransformRead extends SchemaTransform implements Serializable {
private final SpannerReadSchemaTransformConfiguration configuration;

Expand All @@ -113,6 +152,12 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
} else {
read = read.withTable(configuration.getTableId()).withColumns(configuration.getColumns());
}
if (!Strings.isNullOrEmpty(configuration.getIndex())) {
read = read.withIndex(configuration.getIndex());
}
if (Boolean.FALSE.equals(configuration.getBatching())) {
read = read.withBatching(false);
}
PCollection<Struct> spannerRows = input.getPipeline().apply(read);
Schema schema = spannerRows.getSchema();
PCollection<Row> rows =
Expand All @@ -124,11 +169,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_read:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.emptyList();
Expand Down Expand Up @@ -157,6 +197,10 @@ public abstract static class Builder {

public abstract Builder setColumns(List<String> columns);

public abstract Builder setIndex(String index);

public abstract Builder setBatching(Boolean batching);

public abstract SpannerReadSchemaTransformConfiguration build();
}

Expand Down Expand Up @@ -193,16 +237,16 @@ public static Builder builder() {
.Builder();
}

@SchemaFieldDescription("Specifies the GCP project ID.")
@Nullable
public abstract String getProjectId();

@SchemaFieldDescription("Specifies the Cloud Spanner instance.")
public abstract String getInstanceId();

@SchemaFieldDescription("Specifies the Cloud Spanner database.")
public abstract String getDatabaseId();

@SchemaFieldDescription("Specifies the GCP project ID.")
@Nullable
public abstract String getProjectId();

@SchemaFieldDescription("Specifies the Cloud Spanner table.")
@Nullable
public abstract String getTableId();
Expand All @@ -211,9 +255,20 @@ public static Builder builder() {
@Nullable
public abstract String getQuery();

@SchemaFieldDescription("Specifies the columns to read from the table.")
@SchemaFieldDescription(
"Specifies the columns to read from the table. This parameter is required when table is specified.")
@Nullable
public abstract List<String> getColumns();

@SchemaFieldDescription(
"Specifies the Index to read from. This parameter can only be specified when using table.")
@Nullable
public abstract String getIndex();

@SchemaFieldDescription(
"Set to false to disable batching. Useful when using a query that is not compatible with the PartitionQuery API. Defaults to true.")
@Nullable
public abstract Boolean getBatching();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,49 +67,37 @@
* <p>The transformation uses the {@link SpannerIO} to perform the write operation and provides
* options to handle failed mutations, either by throwing an error, or passing the failed mutation
* further in the pipeline for dealing with accordingly.
*
* <p>Example usage in a YAML pipeline without error handling:
*
* <pre>{@code
* pipeline:
* transforms:
* - type: WriteToSpanner
* name: WriteShipments
* config:
* project_id: 'apache-beam-testing'
* instance_id: 'shipment-test'
* database_id: 'shipment'
* table_id: 'shipments'
*
* }</pre>
*
* <p>Example usage in a YAML pipeline using error handling:
*
* <pre>{@code
* pipeline:
* transforms:
* - type: WriteToSpanner
* name: WriteShipments
* config:
* project_id: 'apache-beam-testing'
* instance_id: 'shipment-test'
* database_id: 'shipment'
* table_id: 'shipments'
* error_handling:
* output: 'errors'
*
* - type: WriteToJson
* input: WriteSpanner.my_error_output
* config:
* path: errors.json
*
* }</pre>
*/
@AutoService(SchemaTransformProvider.class)
public class SpannerWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<
SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> {

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_write:v1";
}

@Override
public String description() {
return "Performs a bulk write to a Google Cloud Spanner table.\n"
+ "\n"
+ "Example configuration for performing a write to a single table: ::\n"
+ "\n"
+ " pipeline:\n"
+ " transforms:\n"
+ " - type: ReadFromSpanner\n"
+ " config:\n"
+ " project_id: 'my-project-id'\n"
+ " instance_id: 'my-instance-id'\n"
+ " database_id: 'my-database'\n"
+ " table: 'my-table'\n"
+ "\n"
+ "Note: See <a href=\""
+ "https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.html\">"
+ "SpannerIO</a> for more advanced information.";
}

@Override
protected Class<SpannerWriteSchemaTransformConfiguration> configurationClass() {
return SpannerWriteSchemaTransformConfiguration.class;
Expand Down Expand Up @@ -225,11 +213,6 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) {
}
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:spanner_write:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.singletonList("input");
Expand All @@ -244,10 +227,6 @@ public List<String> outputCollectionNames() {
@DefaultSchema(AutoValueSchema.class)
public abstract static class SpannerWriteSchemaTransformConfiguration implements Serializable {

@SchemaFieldDescription("Specifies the GCP project.")
@Nullable
public abstract String getProjectId();

@SchemaFieldDescription("Specifies the Cloud Spanner instance.")
public abstract String getInstanceId();

Expand All @@ -257,7 +236,11 @@ public abstract static class SpannerWriteSchemaTransformConfiguration implements
@SchemaFieldDescription("Specifies the Cloud Spanner table.")
public abstract String getTableId();

@SchemaFieldDescription("Specifies how to handle errors.")
@SchemaFieldDescription("Specifies the GCP project.")
@Nullable
public abstract String getProjectId();

@SchemaFieldDescription("Whether and how to handle write errors.")
@Nullable
public abstract ErrorHandling getErrorHandling();

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@
table: 'table_id'
query: 'query'
columns: 'columns'
index: 'index'
batching: 'batching'
'WriteToSpanner':
project: 'project_id'
instance: 'instance_id'
Expand Down

0 comments on commit 2488ca1

Please sign in to comment.