Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support for identity columns in import/export #1812

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.DEFAULT_EXPRESSION;
import static com.google.cloud.teleport.spanner.AvroUtil.GENERATION_EXPRESSION;
import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN;
import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN;
import static com.google.cloud.teleport.spanner.AvroUtil.INPUT;
import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL;
import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT;
Expand Down Expand Up @@ -103,9 +104,6 @@ public Ddl toDdl(Collection<Schema> avroSchemas) {
builder.addChangeStream(toChangeStream(null, schema));
} else if (schema.getProp(SPANNER_SEQUENCE_OPTION + "0") != null
|| schema.getProp(SPANNER_SEQUENCE_KIND) != null) {
// Cloud Sequence always requires at least one option,
// `sequence_kind='bit_reversed_positive`, so `sequenceOption_0` must
// always be valid.
builder.addSequence(toSequence(null, schema));
} else if (SPANNER_NAMED_SCHEMA.equals(schema.getProp(SPANNER_ENTITY))) {
builder.addSchema(toSchema(null, schema));
Expand Down Expand Up @@ -454,7 +452,8 @@ public Sequence toSequence(String sequenceName, Schema schema) {
LOG.debug("Converting to Ddl sequenceName {}", sequenceName);
Sequence.Builder builder = Sequence.builder(dialect).name(sequenceName);

if (schema.getProp(SPANNER_SEQUENCE_KIND) != null) {
if (schema.getProp(SPANNER_SEQUENCE_KIND) != null
&& schema.getProp(SPANNER_SEQUENCE_KIND).equals("bit_reversed_positive")) {
builder.sequenceKind(schema.getProp(SPANNER_SEQUENCE_KIND));
}
if (schema.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN) != null
Expand All @@ -469,7 +468,12 @@ public Sequence toSequence(String sequenceName, Schema schema) {

ImmutableList.Builder<String> sequenceOptions = ImmutableList.builder();
for (int i = 0; schema.getProp(SPANNER_SEQUENCE_OPTION + i) != null; i++) {
sequenceOptions.add(schema.getProp(SPANNER_SEQUENCE_OPTION + i));
String prop = schema.getProp(SPANNER_SEQUENCE_OPTION + i);
if (prop.equals("sequence_kind=default")) {
// Specify no sequence kind by using the default_sequence_kind database option.
continue;
}
sequenceOptions.add(prop);
}
builder.options(sequenceOptions.build());

Expand Down Expand Up @@ -509,6 +513,22 @@ public Table toTable(String tableName, Schema schema) {
Column.Builder column = table.column(f.name());
String sqlType = f.getProp(SQL_TYPE);
String expression = f.getProp(GENERATION_EXPRESSION);
String identityColumn = f.getProp(IDENTITY_COLUMN);
if (identityColumn != null && Boolean.parseBoolean(identityColumn)) {
column.isIdentityColumn(true);
if (f.getProp(SPANNER_SEQUENCE_KIND) != null) {
column.sequenceKind(f.getProp(SPANNER_SEQUENCE_KIND));
}
if (f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN) != null
&& f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX) != null) {
column
.skipRangeMin(Long.valueOf(f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN)))
.skipRangeMax(Long.valueOf(f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX)));
}
if (f.getProp(SPANNER_SEQUENCE_COUNTER_START) != null) {
column.counterStartValue(Long.valueOf(f.getProp(SPANNER_SEQUENCE_COUNTER_START)));
}
}
if (expression != null) {
// This is a generated column.
if (Strings.isNullOrEmpty(sqlType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private AvroUtil() {}
public static final String GENERATION_EXPRESSION = "generationExpression";
public static final String GOOGLE_FORMAT_VERSION = "googleFormatVersion";
public static final String GOOGLE_STORAGE = "googleStorage";
public static final String IDENTITY_COLUMN = "identityColumn";
public static final String INPUT = "Input";
public static final String NOT_NULL = "notNull";
public static final String OUTPUT = "Output";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_FORMAT_VERSION;
import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_STORAGE;
import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN;
import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN;
import static com.google.cloud.teleport.spanner.AvroUtil.INPUT;
import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL;
import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT;
Expand Down Expand Up @@ -175,7 +176,16 @@ public Collection<Schema> convert(Ddl ddl) {
// which are semantically logical entities.
fieldBuilder.type(SchemaBuilder.builder().nullType()).withDefault(null);
} else {
if (cm.defaultExpression() != null) {
if (cm.isIdentityColumn()) {
fieldBuilder.prop(IDENTITY_COLUMN, Boolean.toString(cm.isIdentityColumn()));
if (cm.sequenceKind() != null) {
fieldBuilder.prop(SPANNER_SEQUENCE_KIND, cm.sequenceKind());
}
fieldBuilder.prop(
SPANNER_SEQUENCE_COUNTER_START, String.valueOf(cm.counterStartValue()));
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(cm.skipRangeMin()));
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(cm.skipRangeMax()));
} else if (cm.defaultExpression() != null) {
fieldBuilder.prop(DEFAULT_EXPRESSION, cm.defaultExpression());
}
Schema avroType = avroType(cm.type(), table.name() + "_" + columnOrdinal++);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;

/** Cloud Spanner column. */
Expand Down Expand Up @@ -53,6 +55,20 @@ public abstract class Column implements Serializable {

public abstract boolean isStored();

public abstract boolean isIdentityColumn();

@Nullable
public abstract String sequenceKind();

@Nullable
public abstract Long counterStartValue();

@Nullable
public abstract Long skipRangeMin();

@Nullable
public abstract Long skipRangeMax();

public abstract boolean isPlacementKey();

public abstract Dialect dialect();
Expand All @@ -68,6 +84,7 @@ public static Builder builder(Dialect dialect) {
.columnOptions(ImmutableList.of())
.notNull(false)
.isGenerated(false)
.isIdentityColumn(false)
.isHidden(false)
.generationExpression("")
.isStored(false)
Expand Down Expand Up @@ -96,6 +113,25 @@ public void prettyPrint(Appendable appendable) throws IOException {
appendable.append(" (").append(defaultExpression()).append(")");
}
}
if (isIdentityColumn()) {
appendable.append(" GENERATED BY DEFAULT AS IDENTITY");
List<String> options = new ArrayList<>(3);
if (sequenceKind() != null && sequenceKind().equalsIgnoreCase("bit_reversed_positive")) {
options.add("BIT_REVERSED_POSITIVE");
}
if (skipRangeMin() != null && skipRangeMax() != null) {
options.add(
String.format(
"SKIP RANGE %d%s %d",
skipRangeMin(), dialect() == Dialect.POSTGRESQL ? "" : ",", skipRangeMax()));
}
if (counterStartValue() != null) {
options.add(String.format("START COUNTER WITH %d", counterStartValue()));
}
if (options.size() > 0) {
appendable.append(" (").append(String.join(" ", options)).append(")");
}
}
if (isGenerated()) {
if (dialect() == Dialect.POSTGRESQL) {
appendable.append(" GENERATED ALWAYS");
Expand Down Expand Up @@ -197,6 +233,16 @@ public Builder generatedAs(String expression) {

public abstract Builder isStored(boolean generated);

public abstract Builder isIdentityColumn(boolean identityColumn);

public abstract Builder sequenceKind(String sequenceKind);

public abstract Builder counterStartValue(Long value);

public abstract Builder skipRangeMin(Long value);

public abstract Builder skipRangeMax(Long value);

public Builder stored() {
return isStored(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public class DatabaseOptionAllowlist {
// allow list.
private DatabaseOptionAllowlist() {}

// Only those databse options whose name are included in the allowlist will be processed in
// Only those database options whose name are included in the allowlist will be processed in
// export/import pipelines.
public static final ImmutableList<String> DATABASE_OPTION_ALLOWLIST =
ImmutableList.of("version_retention_period", "opt_in_dataplacement_preview");
ImmutableList.of(
"version_retention_period", "opt_in_dataplacement_preview", "default_sequence_kind");
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,31 @@ private void listTables(Ddl.Builder builder) {
}
}

private Long updateCounterForIdentityColumn(Long initialCounter, String qualifiedColumnName) {
Statement sequenceCounterStatement;
switch (dialect) {
case GOOGLE_STANDARD_SQL:
sequenceCounterStatement =
Statement.of("SELECT GET_TABLE_COLUMN_IDENTITY_STATE('" + qualifiedColumnName + "')");
break;
case POSTGRESQL:
sequenceCounterStatement =
Statement.of(
"SELECT spanner.GET_TABLE_COLUMN_IDENTITY_STATE('" + qualifiedColumnName + "')");
break;
default:
throw new IllegalArgumentException("Unrecognized dialect: " + dialect);
}
ResultSet resultSetForCounter = context.executeQuery(sequenceCounterStatement);
if (resultSetForCounter.next() && !resultSetForCounter.isNull(0)) {
// Add a buffer to accommodate writes that may happen after import
// is run. Note that this is not 100% failproof, since more writes may
// happen and they will make the sequence advances past the buffer.
return resultSetForCounter.getLong(0) + Sequence.SEQUENCE_COUNTER_BUFFER;
}
return initialCounter;
}

private void listColumns(Ddl.Builder builder) {
Statement statement = listColumnsSQL();

Expand All @@ -320,11 +345,27 @@ private void listColumns(Ddl.Builder builder) {
String generationExpression = resultSet.isNull(7) ? "" : resultSet.getString(7);
boolean isStored = !resultSet.isNull(8) && resultSet.getString(8).equalsIgnoreCase("YES");
String defaultExpression = resultSet.isNull(9) ? null : resultSet.getString(9);
boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(10) : false;
boolean isIdentity = resultSet.getString(10).equalsIgnoreCase("YES");
String identityKind = resultSet.isNull(11) ? null : resultSet.getString(11);
// The start_with_counter value is the initial value and cannot represent the actual state of
// the counter. We need to apply the current counter to the DDL builder, instead of the one
// retrieved from Information Schema.
Long identityStartWithCounter =
resultSet.isNull(12) ? null : Long.valueOf(resultSet.getString(12));
if (isIdentity) {
identityStartWithCounter =
updateCounterForIdentityColumn(
identityStartWithCounter, tableSchema + "." + columnName);
}
Long identitySkipRangeMin =
resultSet.isNull(13) ? null : Long.valueOf(resultSet.getString(13));
Long identitySkipRangeMax =
resultSet.isNull(14) ? null : Long.valueOf(resultSet.getString(14));
boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(15) : false;
boolean isPlacementKey =
dialect == Dialect.GOOGLE_STANDARD_SQL
? resultSet.getBoolean(11)
: resultSet.getBoolean(10);
? resultSet.getBoolean(16)
: resultSet.getBoolean(15);

builder
.createTable(tableName)
Expand All @@ -336,6 +377,11 @@ private void listColumns(Ddl.Builder builder) {
.generationExpression(generationExpression)
.isStored(isStored)
.defaultExpression(defaultExpression)
.isIdentityColumn(isIdentity)
.sequenceKind(identityKind)
.counterStartValue(identityStartWithCounter)
.skipRangeMin(identitySkipRangeMin)
.skipRangeMax(identitySkipRangeMax)
.isPlacementKey(isPlacementKey)
.endColumn()
.endTable();
Expand All @@ -357,7 +403,8 @@ Statement listColumnsSQL() {
"SELECT c.table_schema, c.table_name, c.column_name,"
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored,"
+ " c.column_default, c.is_hidden,"
+ " c.column_default, c.is_identity, c.identity_kind, c.identity_start_with_counter,"
+ " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
Expand All @@ -372,6 +419,8 @@ Statement listColumnsSQL() {
"SELECT c.table_schema, c.table_name, c.column_name,"
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored, c.column_default,"
+ " c.is_identity, c.identity_kind, c.identity_start_with_counter,"
+ " c.identity_skip_range_min, c.identity_skip_range_max,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
Expand Down Expand Up @@ -1637,6 +1686,15 @@ private void listSequenceOptionsGoogleSQL(
options.add(optionName + "=" + optionValue);
}
}
// If the sequence kind is not specified, assign it to 'default'.
for (var entry : allOptions.entrySet()) {
if (!entry.getValue().toString().contains(Sequence.SEQUENCE_KIND)) {
entry
.getValue()
.add(
Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE);
}
}

// Inject the current counter value to sequences that are in use.
for (Map.Entry<String, Long> entry : currentCounters.entrySet()) {
Expand Down Expand Up @@ -1684,8 +1742,7 @@ private void listSequenceOptionsPostgreSQL(
Long skipRangeMax = resultSet.isNull(5) ? null : resultSet.getLong(5);

if (sequenceKind == null) {
throw new IllegalArgumentException(
"Sequence kind for sequence " + sequenceName + " cannot be null");
sequenceKind = "default";
}
if (currentCounters.containsKey(sequenceName)) {
// The sequence is in use, we need to apply the current counter to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
private static final long serialVersionUID = 1L;
public static final long SEQUENCE_COUNTER_BUFFER = 1000L;
public static final String SEQUENCE_START_WITH_COUNTER = "start_with_counter";
public static final String SEQUENCE_KIND = "sequence_kind";

public abstract String name();

Expand Down Expand Up @@ -75,11 +76,14 @@
}

if (dialect() == Dialect.POSTGRESQL) {
if (!sequenceKind().equalsIgnoreCase("bit_reversed_positive")) {
throw new IllegalArgumentException(
String.format("Unrecognized sequence kind: %s.", sequenceKind()));
if (sequenceKind() != null && !sequenceKind().equalsIgnoreCase("default")) {
if (sequenceKind().equalsIgnoreCase("bit_reversed_positive")) {
appendable.append(" BIT_REVERSED_POSITIVE");
} else {
throw new IllegalArgumentException(
String.format("Unrecognized sequence kind: %s.", sequenceKind()));

Check warning on line 84 in v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Sequence.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Sequence.java#L83-L84

Added lines #L83 - L84 were not covered by tests
}
}
appendable.append(" BIT_REVERSED_POSITIVE");
if (skipRangeMin() != null && skipRangeMax() != null) {
appendable
.append(" SKIP RANGE ")
Expand Down
Loading
Loading