Skip to content

Commit

Permalink
Some fixes for Spanner change streams to BigQuery Dataflow template
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 447528974
  • Loading branch information
cloud-teleport authored and pranavbhandari24 committed May 10, 2022
1 parent fb68cfa commit 89f0074
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
*/
public interface SpannerChangeStreamsToBigQueryOptions extends DataflowPipelineOptions {

@Description(
"Project to read change streams from. The default for this parameter is the project where the"
+ " Dataflow pipeline is running.")
@Default.String("")
String getSpannerProjectId();

void setSpannerProjectId(String projectId);

@Description("The Spanner instance ID that contains the Change Stream.")
@Validation.Required
String getSpannerInstanceId();
Expand All @@ -51,11 +59,19 @@ public interface SpannerChangeStreamsToBigQueryOptions extends DataflowPipelineO

void setSpannerMetadataDatabaseId(String value);

@Description(
"The Cloud Spanner change streams Connector metadata table name to use. If not provided, a"
+ " Cloud Spanner change streams Connector metadata table will automatically be created"
+ " during the pipeline flow.")
String getSpannerMetadataTableName();

void setSpannerMetadataTableName(String value);

@Description("The name of the Spanner Change Stream.")
@Validation.Required
String getSpannerChangeStream();
String getSpannerChangeStreamName();

void setSpannerChangeStream(String value);
void setSpannerChangeStreamName(String value);

@Description(
"Priority for Spanner RPC invocations. Defaults to HIGH. Allowed priorites are LOW, MEDIUM,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
*/
package com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.Key.Builder;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.Mod;
Expand All @@ -30,11 +34,17 @@
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerToBigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.SpannerUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
Expand All @@ -50,13 +60,18 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class {@link FailsafeModJsonToTableRowTransformer} provides methods that convert a {@link Mod}
* JSON string wrapped in {@link FailsafeElement} to a {@link TableRow}.
*/
public final class FailsafeModJsonToTableRowTransformer {

private static final Logger LOG =
LoggerFactory.getLogger(FailsafeModJsonToTableRowTransformer.class);

/**
* Primary class for taking a {@link FailsafeElement} {@link Mod} JSON input and converting to a
* {@link TableRow}.
Expand Down Expand Up @@ -107,6 +122,7 @@ public static class FailsafeModJsonToTableRowFn
private final Set<String> ignoreFields;
public TupleTag<TableRow> transformOut;
public TupleTag<FailsafeElement<String, String>> transformDeadLetterOut;
private transient CallContextConfigurator callContextConfigurator;

public FailsafeModJsonToTableRowFn(
SpannerConfig spannerConfig,
Expand All @@ -124,12 +140,30 @@ public FailsafeModJsonToTableRowFn(
}
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
setUpCallContextConfigurator();
}

private void setUpCallContextConfigurator() {
callContextConfigurator =
new CallContextConfigurator() {
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
return GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withDeadlineAfter(120L, TimeUnit.SECONDS));
}
};
}

@Setup
public void setUp() {
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
spannerTableByName =
new SpannerUtils(spannerAccessor.getDatabaseClient(), spannerChangeStream)
.getSpannerTableByName();
setUpCallContextConfigurator();
}

@Teardown
Expand Down Expand Up @@ -230,26 +264,71 @@ private TableRow modJsonStringToTableRow(String modJsonString) throws Exception
.map(spannerNonPkColumn -> spannerNonPkColumn.getName())
.collect(Collectors.toList());

Options.ReadQueryUpdateTransactionOption options =
Options.priority(spannerConfig.getRpcPriority().get());
// We assume the Spanner schema isn't changed while the pipeline is running, so the read is
// expected to succeed in normal cases. The schema change is currently not supported.
try (ResultSet resultSet =
spannerAccessor
.getDatabaseClient()
.singleUseReadOnlyTransaction(
TimestampBound.ofReadTimestamp(spannerCommitTimestamp))
.read(
spannerTable.getTableName(),
KeySet.singleKey(keyBuilder.build()),
spannerNonPkColumnNames,
options)) {
SpannerToBigQueryUtils.spannerSnapshotRowToBigQueryTableRow(
resultSet, spannerNonPkColumns, tableRow);
int retryCount = 0;
while (true) {
try {
readSpannerRow(
spannerTable.getTableName(),
keyBuilder.build(),
spannerNonPkColumns,
spannerNonPkColumnNames,
spannerCommitTimestamp,
tableRow);
break;
} catch (Exception e) {
// Retry for maximum 3 times in case of transient error.
if (retryCount > 3) {
throw e;
} else {
LOG.error(
"Caught exception from Spanner snapshot read: {}, stack trace:{} current retry"
+ " count: {}",
e,
e.getStackTrace(),
retryCount);
// Wait for 1 seconds before next retry.
TimeUnit.SECONDS.sleep(1);
retryCount++;
}
}
}

return tableRow;
}

// Do a Spanner read to retrieve full row. The schema change is currently not supported. so we
// assume the schema isn't changed while the pipeline is running,
private void readSpannerRow(
String spannerTableName,
com.google.cloud.spanner.Key key,
List<TrackedSpannerColumn> spannerNonPkColumns,
List<String> spannerNonPkColumnNames,
com.google.cloud.Timestamp spannerCommitTimestamp,
TableRow tableRow) {
Options.ReadQueryUpdateTransactionOption options =
Options.priority(spannerConfig.getRpcPriority().get());
// Create a context that uses the custom call configuration.
Context context =
Context.current()
.withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, callContextConfigurator);
// Do the snapshot read in the custom context.
context.run(
() -> {
try (ResultSet resultSet =
spannerAccessor
.getDatabaseClient()
.singleUseReadOnlyTransaction(
TimestampBound.ofReadTimestamp(spannerCommitTimestamp))
.read(
spannerTableName,
KeySet.singleKey(key),
spannerNonPkColumnNames,
options)) {
SpannerToBigQueryUtils.spannerSnapshotRowToBigQueryTableRow(
resultSet, spannerNonPkColumns, tableRow);
}
});
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options)
*/
Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
String spannerProjectId = getSpannerProjectId(options);

String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";
Expand All @@ -166,7 +167,7 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options)
SpannerConfig spannerConfig =
SpannerConfig.create()
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
.withProjectId(options.getProject())
.withProjectId(spannerProjectId)
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.withRpcPriority(options.getSpannerRpcPriority());
Expand All @@ -176,11 +177,16 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options)
.withSpannerConfig(spannerConfig)
.withMetadataInstance(options.getSpannerMetadataInstanceId())
.withMetadataDatabase(options.getSpannerMetadataDatabaseId())
.withChangeStreamName(options.getSpannerChangeStream())
.withChangeStreamName(options.getSpannerChangeStreamName())
.withInclusiveStartAt(startTimestamp)
.withInclusiveEndAt(endTimestamp)
.withRpcPriority(options.getSpannerRpcPriority());

String spannerMetadataTableName = options.getSpannerMetadataTableName();
if (spannerMetadataTableName != null) {
readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName);
}

PCollection<DataChangeRecord> dataChangeRecord =
pipeline
.apply("Read from Spanner Change Streams", readChangeStream)
Expand Down Expand Up @@ -217,7 +223,7 @@ public void process(
failsafeModJsonToTableRowOptions =
FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions.builder()
.setSpannerConfig(spannerConfig)
.setSpannerChangeStream(options.getSpannerChangeStream())
.setSpannerChangeStream(options.getSpannerChangeStreamName())
.setIgnoreFields(options.getIgnoreFields())
.setCoder(FAILSAFE_ELEMENT_CODER)
.build();
Expand All @@ -232,8 +238,8 @@ public void process(
bigQueryDynamicDestinationsOptions =
BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions.builder()
.setSpannerConfig(spannerConfig)
.setChangeStreamName(options.getSpannerChangeStream())
.setBigQueryProject(getBigQueryProject(options))
.setChangeStreamName(options.getSpannerChangeStreamName())
.setBigQueryProject(getBigQueryProjectId(options))
.setBigQueryDataset(options.getBigQueryDataset())
.setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate())
.build();
Expand Down Expand Up @@ -274,6 +280,7 @@ public void process(
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqDirectory)
.withTmpDirectory(tempDlqDirectory)
.setIncludePaneInfo(true)
.build());

PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
Expand All @@ -288,6 +295,7 @@ public void process(
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
.setIncludePaneInfo(true)
.build());

return pipeline.run();
Expand All @@ -306,7 +314,13 @@ private static DeadLetterQueueManager buildDlqManager(
return DeadLetterQueueManager.create(dlqDirectory, DLQ_MAX_RETRIES);
}

private static String getBigQueryProject(SpannerChangeStreamsToBigQueryOptions options) {
private static String getSpannerProjectId(SpannerChangeStreamsToBigQueryOptions options) {
return options.getSpannerProjectId().isEmpty()
? options.getProject()
: options.getSpannerProjectId();
}

private static String getBigQueryProjectId(SpannerChangeStreamsToBigQueryOptions options) {
return options.getBigQueryProjectId().isEmpty()
? options.getProject()
: options.getBigQueryProjectId();
Expand Down

0 comments on commit 89f0074

Please sign in to comment.