diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java index d9ae1d9435..8d58517b89 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java @@ -27,6 +27,14 @@ */ public interface SpannerChangeStreamsToBigQueryOptions extends DataflowPipelineOptions { + @Description( + "Project to read change streams from. The default for this parameter is the project where the" + + " Dataflow pipeline is running.") + @Default.String("") + String getSpannerProjectId(); + + void setSpannerProjectId(String projectId); + @Description("The Spanner instance ID that contains the Change Stream.") @Validation.Required String getSpannerInstanceId(); @@ -51,11 +59,19 @@ public interface SpannerChangeStreamsToBigQueryOptions extends DataflowPipelineO void setSpannerMetadataDatabaseId(String value); + @Description( + "The Cloud Spanner change streams Connector metadata table name to use. If not provided, a" + + " Cloud Spanner change streams Connector metadata table will automatically be created" + + " during the pipeline flow.") + String getSpannerMetadataTableName(); + + void setSpannerMetadataTableName(String value); + @Description("The name of the Spanner Change Stream.") @Validation.Required - String getSpannerChangeStream(); + String getSpannerChangeStreamName(); - void setSpannerChangeStream(String value); + void setSpannerChangeStreamName(String value); @Description( "Priority for Spanner RPC invocations. Defaults to HIGH. Allowed priorites are LOW, MEDIUM," diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java index a8b358c621..738e7ecb34 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/FailsafeModJsonToTableRowTransformer.java @@ -15,12 +15,16 @@ */ package com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.services.bigquery.model.TableRow; import com.google.auto.value.AutoValue; import com.google.cloud.spanner.Key.Builder; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.TimestampBound; import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.Mod; @@ -30,11 +34,17 @@ import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerToBigQueryUtils; import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerUtils; import com.google.cloud.teleport.v2.values.FailsafeElement; +import io.grpc.CallOptions; +import io.grpc.Context; +import io.grpc.MethodDescriptor; +import java.io.IOException; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -50,6 +60,8 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.node.ObjectNode; import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class {@link FailsafeModJsonToTableRowTransformer} provides methods that convert a {@link Mod} @@ -57,6 +69,9 @@ */ public final class FailsafeModJsonToTableRowTransformer { + private static final Logger LOG = + LoggerFactory.getLogger(FailsafeModJsonToTableRowTransformer.class); + /** * Primary class for taking a {@link FailsafeElement} {@link Mod} JSON input and converting to a * {@link TableRow}. @@ -107,6 +122,7 @@ public static class FailsafeModJsonToTableRowFn private final Set ignoreFields; public TupleTag transformOut; public TupleTag> transformDeadLetterOut; + private transient CallContextConfigurator callContextConfigurator; public FailsafeModJsonToTableRowFn( SpannerConfig spannerConfig, @@ -124,12 +140,30 @@ public FailsafeModJsonToTableRowFn( } } + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); + setUpCallContextConfigurator(); + } + + private void setUpCallContextConfigurator() { + callContextConfigurator = + new CallContextConfigurator() { + public ApiCallContext configure( + ApiCallContext context, ReqT request, MethodDescriptor method) { + return GrpcCallContext.createDefault() + .withCallOptions(CallOptions.DEFAULT.withDeadlineAfter(120L, TimeUnit.SECONDS)); + } + }; + } + @Setup public void setUp() { spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); spannerTableByName = new SpannerUtils(spannerAccessor.getDatabaseClient(), spannerChangeStream) .getSpannerTableByName(); + setUpCallContextConfigurator(); } @Teardown @@ -230,26 +264,71 @@ private TableRow modJsonStringToTableRow(String modJsonString) throws Exception .map(spannerNonPkColumn -> spannerNonPkColumn.getName()) .collect(Collectors.toList()); - Options.ReadQueryUpdateTransactionOption options = - Options.priority(spannerConfig.getRpcPriority().get()); - // We assume the Spanner schema isn't changed while the pipeline is running, so the read is - // expected to succeed in normal cases. The schema change is currently not supported. - try (ResultSet resultSet = - spannerAccessor - .getDatabaseClient() - .singleUseReadOnlyTransaction( - TimestampBound.ofReadTimestamp(spannerCommitTimestamp)) - .read( - spannerTable.getTableName(), - KeySet.singleKey(keyBuilder.build()), - spannerNonPkColumnNames, - options)) { - SpannerToBigQueryUtils.spannerSnapshotRowToBigQueryTableRow( - resultSet, spannerNonPkColumns, tableRow); + int retryCount = 0; + while (true) { + try { + readSpannerRow( + spannerTable.getTableName(), + keyBuilder.build(), + spannerNonPkColumns, + spannerNonPkColumnNames, + spannerCommitTimestamp, + tableRow); + break; + } catch (Exception e) { + // Retry for maximum 3 times in case of transient error. + if (retryCount > 3) { + throw e; + } else { + LOG.error( + "Caught exception from Spanner snapshot read: {}, stack trace:{} current retry" + + " count: {}", + e, + e.getStackTrace(), + retryCount); + // Wait for 1 seconds before next retry. + TimeUnit.SECONDS.sleep(1); + retryCount++; + } + } } return tableRow; } + + // Do a Spanner read to retrieve full row. The schema change is currently not supported. so we + // assume the schema isn't changed while the pipeline is running, + private void readSpannerRow( + String spannerTableName, + com.google.cloud.spanner.Key key, + List spannerNonPkColumns, + List spannerNonPkColumnNames, + com.google.cloud.Timestamp spannerCommitTimestamp, + TableRow tableRow) { + Options.ReadQueryUpdateTransactionOption options = + Options.priority(spannerConfig.getRpcPriority().get()); + // Create a context that uses the custom call configuration. + Context context = + Context.current() + .withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, callContextConfigurator); + // Do the snapshot read in the custom context. + context.run( + () -> { + try (ResultSet resultSet = + spannerAccessor + .getDatabaseClient() + .singleUseReadOnlyTransaction( + TimestampBound.ofReadTimestamp(spannerCommitTimestamp)) + .read( + spannerTableName, + KeySet.singleKey(key), + spannerNonPkColumnNames, + options)) { + SpannerToBigQueryUtils.spannerSnapshotRowToBigQueryTableRow( + resultSet, spannerNonPkColumns, tableRow); + } + }); + } } } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java index 690a181ec9..af8a8e1c42 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java @@ -149,6 +149,7 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) */ Pipeline pipeline = Pipeline.create(options); DeadLetterQueueManager dlqManager = buildDlqManager(options); + String spannerProjectId = getSpannerProjectId(options); String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime(); String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/"; @@ -166,7 +167,7 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) SpannerConfig spannerConfig = SpannerConfig.create() .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost())) - .withProjectId(options.getProject()) + .withProjectId(spannerProjectId) .withInstanceId(options.getSpannerInstanceId()) .withDatabaseId(options.getSpannerDatabaseId()) .withRpcPriority(options.getSpannerRpcPriority()); @@ -176,11 +177,16 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) .withSpannerConfig(spannerConfig) .withMetadataInstance(options.getSpannerMetadataInstanceId()) .withMetadataDatabase(options.getSpannerMetadataDatabaseId()) - .withChangeStreamName(options.getSpannerChangeStream()) + .withChangeStreamName(options.getSpannerChangeStreamName()) .withInclusiveStartAt(startTimestamp) .withInclusiveEndAt(endTimestamp) .withRpcPriority(options.getSpannerRpcPriority()); + String spannerMetadataTableName = options.getSpannerMetadataTableName(); + if (spannerMetadataTableName != null) { + readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName); + } + PCollection dataChangeRecord = pipeline .apply("Read from Spanner Change Streams", readChangeStream) @@ -217,7 +223,7 @@ public void process( failsafeModJsonToTableRowOptions = FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions.builder() .setSpannerConfig(spannerConfig) - .setSpannerChangeStream(options.getSpannerChangeStream()) + .setSpannerChangeStream(options.getSpannerChangeStreamName()) .setIgnoreFields(options.getIgnoreFields()) .setCoder(FAILSAFE_ELEMENT_CODER) .build(); @@ -232,8 +238,8 @@ public void process( bigQueryDynamicDestinationsOptions = BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions.builder() .setSpannerConfig(spannerConfig) - .setChangeStreamName(options.getSpannerChangeStream()) - .setBigQueryProject(getBigQueryProject(options)) + .setChangeStreamName(options.getSpannerChangeStreamName()) + .setBigQueryProject(getBigQueryProjectId(options)) .setBigQueryDataset(options.getBigQueryDataset()) .setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate()) .build(); @@ -274,6 +280,7 @@ public void process( DLQWriteTransform.WriteDLQ.newBuilder() .withDlqDirectory(dlqDirectory) .withTmpDirectory(tempDlqDirectory) + .setIncludePaneInfo(true) .build()); PCollection> nonRetryableDlqModJsonFailsafe = @@ -288,6 +295,7 @@ public void process( DLQWriteTransform.WriteDLQ.newBuilder() .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime()) .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/") + .setIncludePaneInfo(true) .build()); return pipeline.run(); @@ -306,7 +314,13 @@ private static DeadLetterQueueManager buildDlqManager( return DeadLetterQueueManager.create(dlqDirectory, DLQ_MAX_RETRIES); } - private static String getBigQueryProject(SpannerChangeStreamsToBigQueryOptions options) { + private static String getSpannerProjectId(SpannerChangeStreamsToBigQueryOptions options) { + return options.getSpannerProjectId().isEmpty() + ? options.getProject() + : options.getSpannerProjectId(); + } + + private static String getBigQueryProjectId(SpannerChangeStreamsToBigQueryOptions options) { return options.getBigQueryProjectId().isEmpty() ? options.getProject() : options.getBigQueryProjectId();