Skip to content

Commit

Permalink
feat(spanner): add support for identity columns in import/export (#1812)
Browse files Browse the repository at this point in the history
* feat(spanner): add support for identity columns in import/export

* Fix the GET_TABLE_COLUMN_IDENTITY_STATE call for PG.

* Add more changes and tests.

* Apply spotless changes.

* Update integration tests.

* Fix unit tests.

* Fix unit tests.

* Fix unit tests.
  • Loading branch information
hengfengli authored Dec 11, 2024
1 parent d96bd95 commit 6a50180
Show file tree
Hide file tree
Showing 15 changed files with 683 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.DEFAULT_EXPRESSION;
import static com.google.cloud.teleport.spanner.AvroUtil.GENERATION_EXPRESSION;
import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN;
import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN;
import static com.google.cloud.teleport.spanner.AvroUtil.INPUT;
import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL;
import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT;
Expand Down Expand Up @@ -103,9 +104,6 @@ public Ddl toDdl(Collection<Schema> avroSchemas) {
builder.addChangeStream(toChangeStream(null, schema));
} else if (schema.getProp(SPANNER_SEQUENCE_OPTION + "0") != null
|| schema.getProp(SPANNER_SEQUENCE_KIND) != null) {
// Cloud Sequence always requires at least one option,
// `sequence_kind='bit_reversed_positive`, so `sequenceOption_0` must
// always be valid.
builder.addSequence(toSequence(null, schema));
} else if (SPANNER_NAMED_SCHEMA.equals(schema.getProp(SPANNER_ENTITY))) {
builder.addSchema(toSchema(null, schema));
Expand Down Expand Up @@ -454,7 +452,8 @@ public Sequence toSequence(String sequenceName, Schema schema) {
LOG.debug("Converting to Ddl sequenceName {}", sequenceName);
Sequence.Builder builder = Sequence.builder(dialect).name(sequenceName);

if (schema.getProp(SPANNER_SEQUENCE_KIND) != null) {
if (schema.getProp(SPANNER_SEQUENCE_KIND) != null
&& schema.getProp(SPANNER_SEQUENCE_KIND).equals("bit_reversed_positive")) {
builder.sequenceKind(schema.getProp(SPANNER_SEQUENCE_KIND));
}
if (schema.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN) != null
Expand All @@ -469,7 +468,12 @@ public Sequence toSequence(String sequenceName, Schema schema) {

ImmutableList.Builder<String> sequenceOptions = ImmutableList.builder();
for (int i = 0; schema.getProp(SPANNER_SEQUENCE_OPTION + i) != null; i++) {
sequenceOptions.add(schema.getProp(SPANNER_SEQUENCE_OPTION + i));
String prop = schema.getProp(SPANNER_SEQUENCE_OPTION + i);
if (prop.equals("sequence_kind=default")) {
// Specify no sequence kind by using the default_sequence_kind database option.
continue;
}
sequenceOptions.add(prop);
}
builder.options(sequenceOptions.build());

Expand Down Expand Up @@ -509,6 +513,22 @@ public Table toTable(String tableName, Schema schema) {
Column.Builder column = table.column(f.name());
String sqlType = f.getProp(SQL_TYPE);
String expression = f.getProp(GENERATION_EXPRESSION);
String identityColumn = f.getProp(IDENTITY_COLUMN);
if (identityColumn != null && Boolean.parseBoolean(identityColumn)) {
column.isIdentityColumn(true);
if (f.getProp(SPANNER_SEQUENCE_KIND) != null) {
column.sequenceKind(f.getProp(SPANNER_SEQUENCE_KIND));
}
if (f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN) != null
&& f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX) != null) {
column
.skipRangeMin(Long.valueOf(f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN)))
.skipRangeMax(Long.valueOf(f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX)));
}
if (f.getProp(SPANNER_SEQUENCE_COUNTER_START) != null) {
column.counterStartValue(Long.valueOf(f.getProp(SPANNER_SEQUENCE_COUNTER_START)));
}
}
if (expression != null) {
// This is a generated column.
if (Strings.isNullOrEmpty(sqlType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private AvroUtil() {}
public static final String GENERATION_EXPRESSION = "generationExpression";
public static final String GOOGLE_FORMAT_VERSION = "googleFormatVersion";
public static final String GOOGLE_STORAGE = "googleStorage";
public static final String IDENTITY_COLUMN = "identityColumn";
public static final String INPUT = "Input";
public static final String NOT_NULL = "notNull";
public static final String OUTPUT = "Output";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_FORMAT_VERSION;
import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_STORAGE;
import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN;
import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN;
import static com.google.cloud.teleport.spanner.AvroUtil.INPUT;
import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL;
import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT;
Expand Down Expand Up @@ -175,7 +176,16 @@ public Collection<Schema> convert(Ddl ddl) {
// which are semantically logical entities.
fieldBuilder.type(SchemaBuilder.builder().nullType()).withDefault(null);
} else {
if (cm.defaultExpression() != null) {
if (cm.isIdentityColumn()) {
fieldBuilder.prop(IDENTITY_COLUMN, Boolean.toString(cm.isIdentityColumn()));
if (cm.sequenceKind() != null) {
fieldBuilder.prop(SPANNER_SEQUENCE_KIND, cm.sequenceKind());
}
fieldBuilder.prop(
SPANNER_SEQUENCE_COUNTER_START, String.valueOf(cm.counterStartValue()));
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(cm.skipRangeMin()));
fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(cm.skipRangeMax()));
} else if (cm.defaultExpression() != null) {
fieldBuilder.prop(DEFAULT_EXPRESSION, cm.defaultExpression());
}
Schema avroType = avroType(cm.type(), table.name() + "_" + columnOrdinal++);
Expand Down
46 changes: 46 additions & 0 deletions v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;

/** Cloud Spanner column. */
Expand Down Expand Up @@ -53,6 +55,20 @@ public abstract class Column implements Serializable {

public abstract boolean isStored();

public abstract boolean isIdentityColumn();

@Nullable
public abstract String sequenceKind();

@Nullable
public abstract Long counterStartValue();

@Nullable
public abstract Long skipRangeMin();

@Nullable
public abstract Long skipRangeMax();

public abstract boolean isPlacementKey();

public abstract Dialect dialect();
Expand All @@ -68,6 +84,7 @@ public static Builder builder(Dialect dialect) {
.columnOptions(ImmutableList.of())
.notNull(false)
.isGenerated(false)
.isIdentityColumn(false)
.isHidden(false)
.generationExpression("")
.isStored(false)
Expand Down Expand Up @@ -96,6 +113,25 @@ public void prettyPrint(Appendable appendable) throws IOException {
appendable.append(" (").append(defaultExpression()).append(")");
}
}
if (isIdentityColumn()) {
appendable.append(" GENERATED BY DEFAULT AS IDENTITY");
List<String> options = new ArrayList<>(3);
if (sequenceKind() != null && sequenceKind().equalsIgnoreCase("bit_reversed_positive")) {
options.add("BIT_REVERSED_POSITIVE");
}
if (skipRangeMin() != null && skipRangeMax() != null) {
options.add(
String.format(
"SKIP RANGE %d%s %d",
skipRangeMin(), dialect() == Dialect.POSTGRESQL ? "" : ",", skipRangeMax()));
}
if (counterStartValue() != null) {
options.add(String.format("START COUNTER WITH %d", counterStartValue()));
}
if (options.size() > 0) {
appendable.append(" (").append(String.join(" ", options)).append(")");
}
}
if (isGenerated()) {
if (dialect() == Dialect.POSTGRESQL) {
appendable.append(" GENERATED ALWAYS");
Expand Down Expand Up @@ -197,6 +233,16 @@ public Builder generatedAs(String expression) {

public abstract Builder isStored(boolean generated);

public abstract Builder isIdentityColumn(boolean identityColumn);

public abstract Builder sequenceKind(String sequenceKind);

public abstract Builder counterStartValue(Long value);

public abstract Builder skipRangeMin(Long value);

public abstract Builder skipRangeMax(Long value);

public Builder stored() {
return isStored(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public class DatabaseOptionAllowlist {
// allow list.
private DatabaseOptionAllowlist() {}

// Only those databse options whose name are included in the allowlist will be processed in
// Only those database options whose name are included in the allowlist will be processed in
// export/import pipelines.
public static final ImmutableList<String> DATABASE_OPTION_ALLOWLIST =
ImmutableList.of("version_retention_period", "opt_in_dataplacement_preview");
ImmutableList.of(
"version_retention_period", "opt_in_dataplacement_preview", "default_sequence_kind");
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,31 @@ private void listTables(Ddl.Builder builder) {
}
}

private Long updateCounterForIdentityColumn(Long initialCounter, String qualifiedColumnName) {
Statement sequenceCounterStatement;
switch (dialect) {
case GOOGLE_STANDARD_SQL:
sequenceCounterStatement =
Statement.of("SELECT GET_TABLE_COLUMN_IDENTITY_STATE('" + qualifiedColumnName + "')");
break;
case POSTGRESQL:
sequenceCounterStatement =
Statement.of(
"SELECT spanner.GET_TABLE_COLUMN_IDENTITY_STATE('" + qualifiedColumnName + "')");
break;
default:
throw new IllegalArgumentException("Unrecognized dialect: " + dialect);
}
ResultSet resultSetForCounter = context.executeQuery(sequenceCounterStatement);
if (resultSetForCounter.next() && !resultSetForCounter.isNull(0)) {
// Add a buffer to accommodate writes that may happen after import
// is run. Note that this is not 100% failproof, since more writes may
// happen and they will make the sequence advances past the buffer.
return resultSetForCounter.getLong(0) + Sequence.SEQUENCE_COUNTER_BUFFER;
}
return initialCounter;
}

private void listColumns(Ddl.Builder builder) {
Statement statement = listColumnsSQL();

Expand All @@ -320,11 +345,27 @@ private void listColumns(Ddl.Builder builder) {
String generationExpression = resultSet.isNull(7) ? "" : resultSet.getString(7);
boolean isStored = !resultSet.isNull(8) && resultSet.getString(8).equalsIgnoreCase("YES");
String defaultExpression = resultSet.isNull(9) ? null : resultSet.getString(9);
boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(10) : false;
boolean isIdentity = resultSet.getString(10).equalsIgnoreCase("YES");
String identityKind = resultSet.isNull(11) ? null : resultSet.getString(11);
// The start_with_counter value is the initial value and cannot represent the actual state of
// the counter. We need to apply the current counter to the DDL builder, instead of the one
// retrieved from Information Schema.
Long identityStartWithCounter =
resultSet.isNull(12) ? null : Long.valueOf(resultSet.getString(12));
if (isIdentity) {
identityStartWithCounter =
updateCounterForIdentityColumn(
identityStartWithCounter, tableSchema + "." + columnName);
}
Long identitySkipRangeMin =
resultSet.isNull(13) ? null : Long.valueOf(resultSet.getString(13));
Long identitySkipRangeMax =
resultSet.isNull(14) ? null : Long.valueOf(resultSet.getString(14));
boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(15) : false;
boolean isPlacementKey =
dialect == Dialect.GOOGLE_STANDARD_SQL
? resultSet.getBoolean(11)
: resultSet.getBoolean(10);
? resultSet.getBoolean(16)
: resultSet.getBoolean(15);

builder
.createTable(tableName)
Expand All @@ -336,6 +377,11 @@ private void listColumns(Ddl.Builder builder) {
.generationExpression(generationExpression)
.isStored(isStored)
.defaultExpression(defaultExpression)
.isIdentityColumn(isIdentity)
.sequenceKind(identityKind)
.counterStartValue(identityStartWithCounter)
.skipRangeMin(identitySkipRangeMin)
.skipRangeMax(identitySkipRangeMax)
.isPlacementKey(isPlacementKey)
.endColumn()
.endTable();
Expand All @@ -357,7 +403,8 @@ Statement listColumnsSQL() {
"SELECT c.table_schema, c.table_name, c.column_name,"
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored,"
+ " c.column_default, c.is_hidden,"
+ " c.column_default, c.is_identity, c.identity_kind, c.identity_start_with_counter,"
+ " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
Expand All @@ -372,6 +419,8 @@ Statement listColumnsSQL() {
"SELECT c.table_schema, c.table_name, c.column_name,"
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
+ " c.is_generated, c.generation_expression, c.is_stored, c.column_default,"
+ " c.is_identity, c.identity_kind, c.identity_start_with_counter,"
+ " c.identity_skip_range_min, c.identity_skip_range_max,"
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
+ " FROM information_schema.columns as c"
+ " LEFT JOIN placementkeycolumns AS pkc"
Expand Down Expand Up @@ -1637,6 +1686,15 @@ private void listSequenceOptionsGoogleSQL(
options.add(optionName + "=" + optionValue);
}
}
// If the sequence kind is not specified, assign it to 'default'.
for (var entry : allOptions.entrySet()) {
if (!entry.getValue().toString().contains(Sequence.SEQUENCE_KIND)) {
entry
.getValue()
.add(
Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE);
}
}

// Inject the current counter value to sequences that are in use.
for (Map.Entry<String, Long> entry : currentCounters.entrySet()) {
Expand Down Expand Up @@ -1684,8 +1742,7 @@ private void listSequenceOptionsPostgreSQL(
Long skipRangeMax = resultSet.isNull(5) ? null : resultSet.getLong(5);

if (sequenceKind == null) {
throw new IllegalArgumentException(
"Sequence kind for sequence " + sequenceName + " cannot be null");
sequenceKind = "default";
}
if (currentCounters.containsKey(sequenceName)) {
// The sequence is in use, we need to apply the current counter to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public abstract class Sequence implements Serializable {
private static final long serialVersionUID = 1L;
public static final long SEQUENCE_COUNTER_BUFFER = 1000L;
public static final String SEQUENCE_START_WITH_COUNTER = "start_with_counter";
public static final String SEQUENCE_KIND = "sequence_kind";

public abstract String name();

Expand Down Expand Up @@ -75,11 +76,14 @@ public void prettyPrint(Appendable appendable) throws IOException {
}

if (dialect() == Dialect.POSTGRESQL) {
if (!sequenceKind().equalsIgnoreCase("bit_reversed_positive")) {
throw new IllegalArgumentException(
String.format("Unrecognized sequence kind: %s.", sequenceKind()));
if (sequenceKind() != null && !sequenceKind().equalsIgnoreCase("default")) {
if (sequenceKind().equalsIgnoreCase("bit_reversed_positive")) {
appendable.append(" BIT_REVERSED_POSITIVE");
} else {
throw new IllegalArgumentException(
String.format("Unrecognized sequence kind: %s.", sequenceKind()));
}
}
appendable.append(" BIT_REVERSED_POSITIVE");
if (skipRangeMin() != null && skipRangeMax() != null) {
appendable
.append(" SKIP RANGE ")
Expand Down
Loading

0 comments on commit 6a50180

Please sign in to comment.