From 6a5018012db9ae4c29d1374e7e91f29193e3d78f Mon Sep 17 00:00:00 2001 From: Hengfeng Li Date: Wed, 11 Dec 2024 11:50:53 +1100 Subject: [PATCH] feat(spanner): add support for identity columns in import/export (#1812) * feat(spanner): add support for identity columns in import/export * Fix the GET_TABLE_COLUMN_IDENTITY_STATE call for PG. * Add more changes and tests. * Apply spotless changes. * Update integration tests. * Fix unit tests. * Fix unit tests. * Fix unit tests. --- .../spanner/AvroSchemaToDdlConverter.java | 30 +++- .../cloud/teleport/spanner/AvroUtil.java | 1 + .../spanner/DdlToAvroSchemaConverter.java | 12 +- .../cloud/teleport/spanner/ddl/Column.java | 46 +++++++ .../spanner/ddl/DatabaseOptionAllowlist.java | 5 +- .../spanner/ddl/InformationSchemaScanner.java | 69 +++++++++- .../cloud/teleport/spanner/ddl/Sequence.java | 12 +- .../spanner/AvroSchemaToDdlConverterTest.java | 74 +++++++++- .../cloud/teleport/spanner/CopyDbTest.java | 128 +++++++++++++++++- .../spanner/DdlToAvroSchemaConverterTest.java | 93 ++++++++++++- .../teleport/spanner/ImportFromAvroTest.java | 86 +++++++++++- .../cloud/teleport/spanner/ddl/DdlTest.java | 92 ++++++++++++- .../ddl/InformationSchemaScannerIT.java | 67 ++++++++- .../ddl/InformationSchemaScannerTest.java | 6 +- .../ddl/RandomInsertMutationGenerator.java | 2 +- 15 files changed, 683 insertions(+), 40 deletions(-) diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java index 4047d86f90..39c6097a58 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java @@ -18,6 +18,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.DEFAULT_EXPRESSION; import static com.google.cloud.teleport.spanner.AvroUtil.GENERATION_EXPRESSION; import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN; +import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN; import static com.google.cloud.teleport.spanner.AvroUtil.INPUT; import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL; import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT; @@ -103,9 +104,6 @@ public Ddl toDdl(Collection avroSchemas) { builder.addChangeStream(toChangeStream(null, schema)); } else if (schema.getProp(SPANNER_SEQUENCE_OPTION + "0") != null || schema.getProp(SPANNER_SEQUENCE_KIND) != null) { - // Cloud Sequence always requires at least one option, - // `sequence_kind='bit_reversed_positive`, so `sequenceOption_0` must - // always be valid. builder.addSequence(toSequence(null, schema)); } else if (SPANNER_NAMED_SCHEMA.equals(schema.getProp(SPANNER_ENTITY))) { builder.addSchema(toSchema(null, schema)); @@ -454,7 +452,8 @@ public Sequence toSequence(String sequenceName, Schema schema) { LOG.debug("Converting to Ddl sequenceName {}", sequenceName); Sequence.Builder builder = Sequence.builder(dialect).name(sequenceName); - if (schema.getProp(SPANNER_SEQUENCE_KIND) != null) { + if (schema.getProp(SPANNER_SEQUENCE_KIND) != null + && schema.getProp(SPANNER_SEQUENCE_KIND).equals("bit_reversed_positive")) { builder.sequenceKind(schema.getProp(SPANNER_SEQUENCE_KIND)); } if (schema.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN) != null @@ -469,7 +468,12 @@ public Sequence toSequence(String sequenceName, Schema schema) { ImmutableList.Builder sequenceOptions = ImmutableList.builder(); for (int i = 0; schema.getProp(SPANNER_SEQUENCE_OPTION + i) != null; i++) { - sequenceOptions.add(schema.getProp(SPANNER_SEQUENCE_OPTION + i)); + String prop = schema.getProp(SPANNER_SEQUENCE_OPTION + i); + if (prop.equals("sequence_kind=default")) { + // Specify no sequence kind by using the default_sequence_kind database option. + continue; + } + sequenceOptions.add(prop); } builder.options(sequenceOptions.build()); @@ -509,6 +513,22 @@ public Table toTable(String tableName, Schema schema) { Column.Builder column = table.column(f.name()); String sqlType = f.getProp(SQL_TYPE); String expression = f.getProp(GENERATION_EXPRESSION); + String identityColumn = f.getProp(IDENTITY_COLUMN); + if (identityColumn != null && Boolean.parseBoolean(identityColumn)) { + column.isIdentityColumn(true); + if (f.getProp(SPANNER_SEQUENCE_KIND) != null) { + column.sequenceKind(f.getProp(SPANNER_SEQUENCE_KIND)); + } + if (f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN) != null + && f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX) != null) { + column + .skipRangeMin(Long.valueOf(f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN))) + .skipRangeMax(Long.valueOf(f.getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX))); + } + if (f.getProp(SPANNER_SEQUENCE_COUNTER_START) != null) { + column.counterStartValue(Long.valueOf(f.getProp(SPANNER_SEQUENCE_COUNTER_START))); + } + } if (expression != null) { // This is a generated column. if (Strings.isNullOrEmpty(sqlType)) { diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java index 544b3debf4..8e3782f862 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java @@ -27,6 +27,7 @@ private AvroUtil() {} public static final String GENERATION_EXPRESSION = "generationExpression"; public static final String GOOGLE_FORMAT_VERSION = "googleFormatVersion"; public static final String GOOGLE_STORAGE = "googleStorage"; + public static final String IDENTITY_COLUMN = "identityColumn"; public static final String INPUT = "Input"; public static final String NOT_NULL = "notNull"; public static final String OUTPUT = "Output"; diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java b/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java index c322c470fa..b0d1954a8a 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java @@ -20,6 +20,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_FORMAT_VERSION; import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_STORAGE; import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN; +import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN; import static com.google.cloud.teleport.spanner.AvroUtil.INPUT; import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL; import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT; @@ -175,7 +176,16 @@ public Collection convert(Ddl ddl) { // which are semantically logical entities. fieldBuilder.type(SchemaBuilder.builder().nullType()).withDefault(null); } else { - if (cm.defaultExpression() != null) { + if (cm.isIdentityColumn()) { + fieldBuilder.prop(IDENTITY_COLUMN, Boolean.toString(cm.isIdentityColumn())); + if (cm.sequenceKind() != null) { + fieldBuilder.prop(SPANNER_SEQUENCE_KIND, cm.sequenceKind()); + } + fieldBuilder.prop( + SPANNER_SEQUENCE_COUNTER_START, String.valueOf(cm.counterStartValue())); + fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MIN, String.valueOf(cm.skipRangeMin())); + fieldBuilder.prop(SPANNER_SEQUENCE_SKIP_RANGE_MAX, String.valueOf(cm.skipRangeMax())); + } else if (cm.defaultExpression() != null) { fieldBuilder.prop(DEFAULT_EXPRESSION, cm.defaultExpression()); } Schema avroType = avroType(cm.type(), table.name() + "_" + columnOrdinal++); diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java index 6ef754f01a..4d3f43614a 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java @@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import javax.annotation.Nullable; /** Cloud Spanner column. */ @@ -53,6 +55,20 @@ public abstract class Column implements Serializable { public abstract boolean isStored(); + public abstract boolean isIdentityColumn(); + + @Nullable + public abstract String sequenceKind(); + + @Nullable + public abstract Long counterStartValue(); + + @Nullable + public abstract Long skipRangeMin(); + + @Nullable + public abstract Long skipRangeMax(); + public abstract boolean isPlacementKey(); public abstract Dialect dialect(); @@ -68,6 +84,7 @@ public static Builder builder(Dialect dialect) { .columnOptions(ImmutableList.of()) .notNull(false) .isGenerated(false) + .isIdentityColumn(false) .isHidden(false) .generationExpression("") .isStored(false) @@ -96,6 +113,25 @@ public void prettyPrint(Appendable appendable) throws IOException { appendable.append(" (").append(defaultExpression()).append(")"); } } + if (isIdentityColumn()) { + appendable.append(" GENERATED BY DEFAULT AS IDENTITY"); + List options = new ArrayList<>(3); + if (sequenceKind() != null && sequenceKind().equalsIgnoreCase("bit_reversed_positive")) { + options.add("BIT_REVERSED_POSITIVE"); + } + if (skipRangeMin() != null && skipRangeMax() != null) { + options.add( + String.format( + "SKIP RANGE %d%s %d", + skipRangeMin(), dialect() == Dialect.POSTGRESQL ? "" : ",", skipRangeMax())); + } + if (counterStartValue() != null) { + options.add(String.format("START COUNTER WITH %d", counterStartValue())); + } + if (options.size() > 0) { + appendable.append(" (").append(String.join(" ", options)).append(")"); + } + } if (isGenerated()) { if (dialect() == Dialect.POSTGRESQL) { appendable.append(" GENERATED ALWAYS"); @@ -197,6 +233,16 @@ public Builder generatedAs(String expression) { public abstract Builder isStored(boolean generated); + public abstract Builder isIdentityColumn(boolean identityColumn); + + public abstract Builder sequenceKind(String sequenceKind); + + public abstract Builder counterStartValue(Long value); + + public abstract Builder skipRangeMin(Long value); + + public abstract Builder skipRangeMax(Long value); + public Builder stored() { return isStored(true); } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/DatabaseOptionAllowlist.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/DatabaseOptionAllowlist.java index f0575e3e50..ed1679449b 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/DatabaseOptionAllowlist.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/DatabaseOptionAllowlist.java @@ -24,8 +24,9 @@ public class DatabaseOptionAllowlist { // allow list. private DatabaseOptionAllowlist() {} - // Only those databse options whose name are included in the allowlist will be processed in + // Only those database options whose name are included in the allowlist will be processed in // export/import pipelines. public static final ImmutableList DATABASE_OPTION_ALLOWLIST = - ImmutableList.of("version_retention_period", "opt_in_dataplacement_preview"); + ImmutableList.of( + "version_retention_period", "opt_in_dataplacement_preview", "default_sequence_kind"); } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java index 91dd9c2e39..3d79d217b5 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java @@ -301,6 +301,31 @@ private void listTables(Ddl.Builder builder) { } } + private Long updateCounterForIdentityColumn(Long initialCounter, String qualifiedColumnName) { + Statement sequenceCounterStatement; + switch (dialect) { + case GOOGLE_STANDARD_SQL: + sequenceCounterStatement = + Statement.of("SELECT GET_TABLE_COLUMN_IDENTITY_STATE('" + qualifiedColumnName + "')"); + break; + case POSTGRESQL: + sequenceCounterStatement = + Statement.of( + "SELECT spanner.GET_TABLE_COLUMN_IDENTITY_STATE('" + qualifiedColumnName + "')"); + break; + default: + throw new IllegalArgumentException("Unrecognized dialect: " + dialect); + } + ResultSet resultSetForCounter = context.executeQuery(sequenceCounterStatement); + if (resultSetForCounter.next() && !resultSetForCounter.isNull(0)) { + // Add a buffer to accommodate writes that may happen after import + // is run. Note that this is not 100% failproof, since more writes may + // happen and they will make the sequence advances past the buffer. + return resultSetForCounter.getLong(0) + Sequence.SEQUENCE_COUNTER_BUFFER; + } + return initialCounter; + } + private void listColumns(Ddl.Builder builder) { Statement statement = listColumnsSQL(); @@ -320,11 +345,27 @@ private void listColumns(Ddl.Builder builder) { String generationExpression = resultSet.isNull(7) ? "" : resultSet.getString(7); boolean isStored = !resultSet.isNull(8) && resultSet.getString(8).equalsIgnoreCase("YES"); String defaultExpression = resultSet.isNull(9) ? null : resultSet.getString(9); - boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(10) : false; + boolean isIdentity = resultSet.getString(10).equalsIgnoreCase("YES"); + String identityKind = resultSet.isNull(11) ? null : resultSet.getString(11); + // The start_with_counter value is the initial value and cannot represent the actual state of + // the counter. We need to apply the current counter to the DDL builder, instead of the one + // retrieved from Information Schema. + Long identityStartWithCounter = + resultSet.isNull(12) ? null : Long.valueOf(resultSet.getString(12)); + if (isIdentity) { + identityStartWithCounter = + updateCounterForIdentityColumn( + identityStartWithCounter, tableSchema + "." + columnName); + } + Long identitySkipRangeMin = + resultSet.isNull(13) ? null : Long.valueOf(resultSet.getString(13)); + Long identitySkipRangeMax = + resultSet.isNull(14) ? null : Long.valueOf(resultSet.getString(14)); + boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(15) : false; boolean isPlacementKey = dialect == Dialect.GOOGLE_STANDARD_SQL - ? resultSet.getBoolean(11) - : resultSet.getBoolean(10); + ? resultSet.getBoolean(16) + : resultSet.getBoolean(15); builder .createTable(tableName) @@ -336,6 +377,11 @@ private void listColumns(Ddl.Builder builder) { .generationExpression(generationExpression) .isStored(isStored) .defaultExpression(defaultExpression) + .isIdentityColumn(isIdentity) + .sequenceKind(identityKind) + .counterStartValue(identityStartWithCounter) + .skipRangeMin(identitySkipRangeMin) + .skipRangeMax(identitySkipRangeMax) .isPlacementKey(isPlacementKey) .endColumn() .endTable(); @@ -357,7 +403,8 @@ Statement listColumnsSQL() { "SELECT c.table_schema, c.table_name, c.column_name," + " c.ordinal_position, c.spanner_type, c.is_nullable," + " c.is_generated, c.generation_expression, c.is_stored," - + " c.column_default, c.is_hidden," + + " c.column_default, c.is_identity, c.identity_kind, c.identity_start_with_counter," + + " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden," + " pkc.constraint_name IS NOT NULL AS is_placement_key" + " FROM information_schema.columns as c" + " LEFT JOIN placementkeycolumns AS pkc" @@ -372,6 +419,8 @@ Statement listColumnsSQL() { "SELECT c.table_schema, c.table_name, c.column_name," + " c.ordinal_position, c.spanner_type, c.is_nullable," + " c.is_generated, c.generation_expression, c.is_stored, c.column_default," + + " c.is_identity, c.identity_kind, c.identity_start_with_counter," + + " c.identity_skip_range_min, c.identity_skip_range_max," + " pkc.constraint_name IS NOT NULL AS is_placement_key" + " FROM information_schema.columns as c" + " LEFT JOIN placementkeycolumns AS pkc" @@ -1637,6 +1686,15 @@ private void listSequenceOptionsGoogleSQL( options.add(optionName + "=" + optionValue); } } + // If the sequence kind is not specified, assign it to 'default'. + for (var entry : allOptions.entrySet()) { + if (!entry.getValue().toString().contains(Sequence.SEQUENCE_KIND)) { + entry + .getValue() + .add( + Sequence.SEQUENCE_KIND + "=" + GSQL_LITERAL_QUOTE + "default" + GSQL_LITERAL_QUOTE); + } + } // Inject the current counter value to sequences that are in use. for (Map.Entry entry : currentCounters.entrySet()) { @@ -1684,8 +1742,7 @@ private void listSequenceOptionsPostgreSQL( Long skipRangeMax = resultSet.isNull(5) ? null : resultSet.getLong(5); if (sequenceKind == null) { - throw new IllegalArgumentException( - "Sequence kind for sequence " + sequenceName + " cannot be null"); + sequenceKind = "default"; } if (currentCounters.containsKey(sequenceName)) { // The sequence is in use, we need to apply the current counter to diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Sequence.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Sequence.java index fedf940d18..e407522565 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Sequence.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Sequence.java @@ -31,6 +31,7 @@ public abstract class Sequence implements Serializable { private static final long serialVersionUID = 1L; public static final long SEQUENCE_COUNTER_BUFFER = 1000L; public static final String SEQUENCE_START_WITH_COUNTER = "start_with_counter"; + public static final String SEQUENCE_KIND = "sequence_kind"; public abstract String name(); @@ -75,11 +76,14 @@ public void prettyPrint(Appendable appendable) throws IOException { } if (dialect() == Dialect.POSTGRESQL) { - if (!sequenceKind().equalsIgnoreCase("bit_reversed_positive")) { - throw new IllegalArgumentException( - String.format("Unrecognized sequence kind: %s.", sequenceKind())); + if (sequenceKind() != null && !sequenceKind().equalsIgnoreCase("default")) { + if (sequenceKind().equalsIgnoreCase("bit_reversed_positive")) { + appendable.append(" BIT_REVERSED_POSITIVE"); + } else { + throw new IllegalArgumentException( + String.format("Unrecognized sequence kind: %s.", sequenceKind())); + } } - appendable.append(" BIT_REVERSED_POSITIVE"); if (skipRangeMin() != null && skipRangeMax() != null) { appendable .append(" SKIP RANGE ") diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java index 4be7a46a48..0100384039 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java @@ -83,6 +83,23 @@ public void simple() { + " \"generationExpression\" : \"CONCAT(first_name, ' ', last_name)\"," + " \"stored\" : \"true\"" + " }, {" + + " \"name\" : \"identity_column\"," + + " \"type\" : [ \"null\", \"long\" ]," + + " \"sqlType\" : \"INT64\"," + + " \"identityColumn\" : \"true\"," + + " \"sequenceKind\" : \"bit_reversed_positive\"," + + " \"skipRangeMin\" : \"2000\"," + + " \"skipRangeMax\" : \"3000\"," + + " \"counterStartValue\" : \"1000\"" + + " }, {" + + " \"name\" : \"identity_column_no_kind\"," + + " \"type\" : [ \"null\", \"long\" ]," + + " \"sqlType\" : \"INT64\"," + + " \"identityColumn\" : \"true\"," + + " \"skipRangeMin\" : \"2000\"," + + " \"skipRangeMax\" : \"3000\"," + + " \"counterStartValue\" : \"1000\"" + + " }, {" + " \"name\" : \"numeric\"," + " \"type\" : [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\"}]," + " \"sqlType\" : \"NUMERIC\"" @@ -231,6 +248,10 @@ public void simple() { + " `first_name` STRING(10) DEFAULT ('John')," + " `last_name` STRING(MAX)," + " `full_name` STRING(MAX) AS (CONCAT(first_name, ' ', last_name)) STORED," + + " `identity_column` INT64 GENERATED BY DEFAULT AS IDENTITY (" + + "BIT_REVERSED_POSITIVE SKIP RANGE 2000, 3000 START COUNTER WITH 1000)," + + " `identity_column_no_kind` INT64 GENERATED BY DEFAULT AS IDENTITY (" + + "SKIP RANGE 2000, 3000 START COUNTER WITH 1000)," + " `numeric` NUMERIC," + " `numeric2` NUMERIC," + " `notNumeric` BYTES(MAX)," @@ -310,6 +331,23 @@ public void pgSimple() { + " \"generationExpression\" : \"CONCAT(first_name, ' ', last_name)\"," + " \"stored\" : \"true\"" + " }, {" + + " \"name\" : \"identity_column\"," + + " \"type\" : [ \"null\", \"long\" ]," + + " \"sqlType\" : \"bigint\"," + + " \"identityColumn\" : \"true\"," + + " \"sequenceKind\" : \"bit_reversed_positive\"," + + " \"skipRangeMin\" : \"2000\"," + + " \"skipRangeMax\" : \"3000\"," + + " \"counterStartValue\" : \"1000\"" + + " }, {" + + " \"name\" : \"identity_column_no_kind\"," + + " \"type\" : [ \"null\", \"long\" ]," + + " \"sqlType\" : \"bigint\"," + + " \"identityColumn\" : \"true\"," + + " \"skipRangeMin\" : \"2000\"," + + " \"skipRangeMax\" : \"3000\"," + + " \"counterStartValue\" : \"1000\"" + + " }, {" + " \"name\" : \"numeric\"," + " \"type\" : [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\"}]," + " \"sqlType\" : \"numeric\"" @@ -422,6 +460,10 @@ public void pgSimple() { + " \"last_name\" character varying," + " \"full_name\" character varying GENERATED ALWAYS AS" + " (CONCAT(first_name, ' ', last_name)) STORED," + + " \"identity_column\" bigint GENERATED BY DEFAULT AS IDENTITY (" + + "BIT_REVERSED_POSITIVE SKIP RANGE 2000 3000 START COUNTER WITH 1000)," + + " \"identity_column_no_kind\" bigint GENERATED BY DEFAULT AS IDENTITY (" + + "SKIP RANGE 2000 3000 START COUNTER WITH 1000)," + " \"numeric\" numeric," + " \"numeric2\" numeric," + " \"notNumeric\" bytea," @@ -938,15 +980,26 @@ public void sequences() { + " \"googleFormatVersion\" : \"booleans\"," + " \"sequenceOption_0\" : \"sequence_kind=\\\"bit_reversed_positive\\\"\"" + "}"; + String avroString4 = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"Sequence4\"," + + " \"fields\" : []," + + " \"namespace\" : \"spannertest\"," + + " \"googleStorage\" : \"CloudSpanner\"," + + " \"googleFormatVersion\" : \"booleans\"," + + " \"sequenceOption_0\" : \"sequence_kind=default\"" + + "}"; Collection schemas = new ArrayList<>(); Schema.Parser parser = new Schema.Parser(); schemas.add(parser.parse(avroString1)); schemas.add(parser.parse(avroString2)); schemas.add(parser.parse(avroString3)); + schemas.add(parser.parse(avroString4)); AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter(); Ddl ddl = converter.toDdl(schemas); - assertThat(ddl.sequences(), hasSize(3)); + assertThat(ddl.sequences(), hasSize(4)); assertThat( ddl.prettyPrint(), equalToCompressingWhiteSpace( @@ -957,7 +1010,8 @@ public void sequences() { + "OPTIONS (sequence_kind=\"bit_reversed_positive\", " + "start_with_counter=9999)\n" + "CREATE SEQUENCE `Sequence3`\n\t" - + "OPTIONS (sequence_kind=\"bit_reversed_positive\")")); + + "OPTIONS (sequence_kind=\"bit_reversed_positive\")\n" + + "CREATE SEQUENCE `Sequence4`")); } @Test @@ -996,16 +1050,27 @@ public void pgSequences() { + " \"googleFormatVersion\" : \"booleans\"," + " \"sequenceKind\" : \"bit_reversed_positive\"" + "}"; + String avroString4 = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"Sequence4\"," + + " \"fields\" : []," + + " \"namespace\" : \"spannertest\"," + + " \"googleStorage\" : \"CloudSpanner\"," + + " \"googleFormatVersion\" : \"booleans\"," + + " \"sequenceKind\" : \"default\"" + + "}"; Collection schemas = new ArrayList<>(); Schema.Parser parser = new Schema.Parser(); schemas.add(parser.parse(avroString1)); schemas.add(parser.parse(avroString2)); schemas.add(parser.parse(avroString3)); + schemas.add(parser.parse(avroString4)); AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter(Dialect.POSTGRESQL); Ddl ddl = converter.toDdl(schemas); assertEquals(ddl.dialect(), Dialect.POSTGRESQL); - assertThat(ddl.sequences(), hasSize(3)); + assertThat(ddl.sequences(), hasSize(4)); assertThat( ddl.prettyPrint(), equalToCompressingWhiteSpace( @@ -1013,7 +1078,8 @@ public void pgSequences() { + "SKIP RANGE 1 1000 START COUNTER WITH 50" + "\nCREATE SEQUENCE \"Sequence2\" BIT_REVERSED_POSITIVE " + "START COUNTER WITH 9999" - + "\nCREATE SEQUENCE \"Sequence3\" BIT_REVERSED_POSITIVE")); + + "\nCREATE SEQUENCE \"Sequence3\" BIT_REVERSED_POSITIVE" + + "\nCREATE SEQUENCE \"Sequence4\"")); } @Test diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java index 4f054b6ca0..2d97d6a6ab 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java @@ -905,10 +905,112 @@ public void pgChangeStreams() throws Exception { runTest(Dialect.POSTGRESQL); } + @Test + public void identityColumn() throws Exception { + // spotless:off + Ddl.Builder ddlBuilder = Ddl.builder(); + List dbOptionList = new ArrayList<>(); + dbOptionList.add( + Export.DatabaseOption.newBuilder() + .setOptionName("default_sequence_kind") + .setOptionValue("\"bit_reversed_positive\"") + .build()); + ddlBuilder.mergeDatabaseOptions(dbOptionList); + Ddl ddl = ddlBuilder + .createTable("IdentityTable") + .column("id") + .int64() + .isIdentityColumn(true) + .sequenceKind("bit_reversed_positive") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("non_key_column") + .int64() + .isIdentityColumn(true) + .sequenceKind("bit_reversed_positive") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("no_sequence_kind_column") + .int64() + .isIdentityColumn(true) + .sequenceKind("default") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("value").int64().endColumn() + .primaryKey().asc("id").end() + .endTable() + .build(); + // spotless:on + + createAndPopulate(ddl, 10); + runTest(); + } + + @Test + public void pgIdentityColumn() throws Exception { + // spotless:off + Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); + List dbOptionList = new ArrayList<>(); + dbOptionList.add( + Export.DatabaseOption.newBuilder() + .setOptionName("default_sequence_kind") + .setOptionValue("\"bit_reversed_positive\"") + .build()); + ddlBuilder.mergeDatabaseOptions(dbOptionList); + Ddl ddl = ddlBuilder + .createTable("IdentityTable") + .column("id") + .int64() + .isIdentityColumn(true) + .sequenceKind("bit_reversed_positive") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("non_key_column") + .int64() + .isIdentityColumn(true) + .sequenceKind("bit_reversed_positive") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("no_sequence_kind_column") + .int64() + .isIdentityColumn(true) + .sequenceKind("default") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("value").int64().endColumn() + .primaryKey().asc("id").end() + .endTable() + .build(); + // spotless:on + + createAndPopulate(ddl, 10); + runTest(Dialect.POSTGRESQL); + } + @Test public void sequences() throws Exception { + Ddl.Builder ddlBuilder = Ddl.builder(); + List dbOptionList = new ArrayList<>(); + dbOptionList.add( + Export.DatabaseOption.newBuilder() + .setOptionName("default_sequence_kind") + .setOptionValue("\"bit_reversed_positive\"") + .build()); + ddlBuilder.mergeDatabaseOptions(dbOptionList); Ddl ddl = - Ddl.builder() + ddlBuilder .createSequence("Sequence1") .options( ImmutableList.of( @@ -925,6 +1027,14 @@ public void sequences() throws Exception { .createSequence("Sequence3") .options(ImmutableList.of("sequence_kind=\"bit_reversed_positive\"")) .endSequence() + .createSequence("Sequence4") + .options( + ImmutableList.of( + "sequence_kind=\"default\"", + "skip_range_min=0", + "skip_range_max=1000", + "start_with_counter=50")) + .endSequence() .createTable("UsersWithSequenceId") .column("id") .int64() @@ -946,8 +1056,16 @@ public void sequences() throws Exception { @Test public void pgSequences() throws Exception { + Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); + List dbOptionList = new ArrayList<>(); + dbOptionList.add( + Export.DatabaseOption.newBuilder() + .setOptionName("default_sequence_kind") + .setOptionValue("\"bit_reversed_positive\"") + .build()); + ddlBuilder.mergeDatabaseOptions(dbOptionList); Ddl ddl = - Ddl.builder(Dialect.POSTGRESQL) + ddlBuilder .createSequence("PGSequence1") .sequenceKind("bit_reversed_positive") .counterStartValue(Long.valueOf(50)) @@ -961,6 +1079,12 @@ public void pgSequences() throws Exception { .createSequence("PGSequence3") .sequenceKind("bit_reversed_positive") .endSequence() + .createSequence("PGSequence4") + .sequenceKind("default") + .counterStartValue(Long.valueOf(50)) + .skipRangeMin(Long.valueOf(0)) + .skipRangeMax(Long.valueOf(1000)) + .endSequence() .createTable("PGUsersWithSequenceId") .column("id") .pgInt8() diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java index acefcf177a..7a333f1d6f 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java @@ -20,6 +20,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_FORMAT_VERSION; import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_STORAGE; import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN; +import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN; import static com.google.cloud.teleport.spanner.AvroUtil.INPUT; import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL; import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT; @@ -143,6 +144,21 @@ public void simple() { .max() .isHidden(true) .endColumn() + .column("identity_column") + .type(Type.int64()) + .isIdentityColumn(true) + .sequenceKind("bit_reversed_positive") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("identity_column_no_kind") + .type(Type.int64()) + .isIdentityColumn(true) + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() .primaryKey() .asc("id") .asc("gen_id") @@ -183,7 +199,7 @@ public void simple() { List fields = avroSchema.getFields(); - assertThat(fields, hasSize(8)); + assertThat(fields, hasSize(10)); assertThat(fields.get(0).name(), equalTo("id")); // Not null @@ -252,6 +268,26 @@ public void simple() { assertThat(fields.get(7).getProp(STORED), equalTo(null)); assertThat(fields.get(7).getProp(HIDDEN), equalTo("true")); + assertThat(fields.get(8).name(), equalTo("identity_column")); + assertThat(fields.get(8).schema(), equalTo(nullableUnion(Schema.Type.LONG))); + assertThat(fields.get(8).getProp(SQL_TYPE), equalTo("INT64")); + assertThat(fields.get(8).getProp(NOT_NULL), equalTo(null)); + assertThat(fields.get(8).getProp(IDENTITY_COLUMN), equalTo("true")); + assertThat(fields.get(8).getProp(SPANNER_SEQUENCE_KIND), equalTo("bit_reversed_positive")); + assertThat(fields.get(8).getProp(SPANNER_SEQUENCE_COUNTER_START), equalTo("1000")); + assertThat(fields.get(8).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN), equalTo("2000")); + assertThat(fields.get(8).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX), equalTo("3000")); + + assertThat(fields.get(9).name(), equalTo("identity_column_no_kind")); + assertThat(fields.get(9).schema(), equalTo(nullableUnion(Schema.Type.LONG))); + assertThat(fields.get(9).getProp(SQL_TYPE), equalTo("INT64")); + assertThat(fields.get(9).getProp(NOT_NULL), equalTo(null)); + assertThat(fields.get(9).getProp(IDENTITY_COLUMN), equalTo("true")); + assertThat(fields.get(9).getProp(SPANNER_SEQUENCE_KIND), equalTo(null)); + assertThat(fields.get(9).getProp(SPANNER_SEQUENCE_COUNTER_START), equalTo("1000")); + assertThat(fields.get(9).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN), equalTo("2000")); + assertThat(fields.get(9).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX), equalTo("3000")); + // spanner pk assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_0"), equalTo("`id` ASC")); assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_1"), equalTo("`gen_id` ASC")); @@ -326,6 +362,21 @@ public void pgSimple() { .generatedAs("MOD(id+1, 64)") .stored() .endColumn() + .column("identity_column") + .type(Type.int64()) + .isIdentityColumn(true) + .sequenceKind("bit_reversed_positive") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("identity_column_no_kind") + .type(Type.int64()) + .isIdentityColumn(true) + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() .primaryKey() .asc("id") .asc("gen_id") @@ -356,7 +407,7 @@ public void pgSimple() { List fields = avroSchema.getFields(); - assertThat(fields, hasSize(5)); + assertThat(fields, hasSize(7)); assertThat(fields.get(0).name(), equalTo("id")); // Not null @@ -401,6 +452,26 @@ public void pgSimple() { assertThat(fields.get(4).getProp(STORED), equalTo("true")); assertThat(fields.get(4).getProp(DEFAULT_EXPRESSION), equalTo(null)); + assertThat(fields.get(5).name(), equalTo("identity_column")); + assertThat(fields.get(5).schema(), equalTo(nullableUnion(Schema.Type.LONG))); + assertThat(fields.get(5).getProp(SQL_TYPE), equalTo("INT64")); + assertThat(fields.get(5).getProp(NOT_NULL), equalTo(null)); + assertThat(fields.get(5).getProp(IDENTITY_COLUMN), equalTo("true")); + assertThat(fields.get(5).getProp(SPANNER_SEQUENCE_KIND), equalTo("bit_reversed_positive")); + assertThat(fields.get(5).getProp(SPANNER_SEQUENCE_COUNTER_START), equalTo("1000")); + assertThat(fields.get(5).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN), equalTo("2000")); + assertThat(fields.get(5).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX), equalTo("3000")); + + assertThat(fields.get(6).name(), equalTo("identity_column_no_kind")); + assertThat(fields.get(6).schema(), equalTo(nullableUnion(Schema.Type.LONG))); + assertThat(fields.get(6).getProp(SQL_TYPE), equalTo("INT64")); + assertThat(fields.get(6).getProp(NOT_NULL), equalTo(null)); + assertThat(fields.get(6).getProp(IDENTITY_COLUMN), equalTo("true")); + assertThat(fields.get(6).getProp(SPANNER_SEQUENCE_KIND), equalTo(null)); + assertThat(fields.get(6).getProp(SPANNER_SEQUENCE_COUNTER_START), equalTo("1000")); + assertThat(fields.get(6).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MIN), equalTo("2000")); + assertThat(fields.get(6).getProp(SPANNER_SEQUENCE_SKIP_RANGE_MAX), equalTo("3000")); + // spanner pk assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_0"), equalTo("\"id\" ASC")); assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_1"), equalTo("\"gen_id\" ASC")); @@ -1557,10 +1628,13 @@ public void sequences() { .createSequence("Sequence3") .options(ImmutableList.of("sequence_kind=\"bit_reversed_positive\"")) .endSequence() + .createSequence("Sequence4") + .options(ImmutableList.of("sequence_kind=\"default\"")) + .endSequence() .build(); Collection result = converter.convert(ddl); - assertThat(result, hasSize(3)); + assertThat(result, hasSize(4)); for (Schema s : result) { assertThat(s.getNamespace(), equalTo("spannertest")); assertThat(s.getProp("googleFormatVersion"), equalTo("booleans")); @@ -1590,6 +1664,10 @@ public void sequences() { assertThat( avroSchema3.getProp("sequenceOption_0"), equalTo("sequence_kind=\"bit_reversed_positive\"")); + + Schema avroSchema4 = it.next(); + assertThat(avroSchema4.getName(), equalTo("Sequence4")); + assertThat(avroSchema4.getProp("sequenceOption_0"), equalTo("sequence_kind=\"default\"")); } @Test @@ -1611,10 +1689,13 @@ public void pgSequences() { .createSequence("PGSequence3") .sequenceKind("bit_reversed_positive") .endSequence() + .createSequence("PGSequence4") + .sequenceKind("default") + .endSequence() .build(); Collection result = converter.convert(ddl); - assertThat(result, hasSize(3)); + assertThat(result, hasSize(4)); for (Schema s : result) { assertThat(s.getNamespace(), equalTo("spannertest")); assertThat(s.getProp("googleFormatVersion"), equalTo("booleans")); @@ -1638,6 +1719,10 @@ public void pgSequences() { Schema avroSchema3 = it.next(); assertThat(avroSchema3.getName(), equalTo("PGSequence3")); assertThat(avroSchema3.getProp(SPANNER_SEQUENCE_KIND), equalTo("bit_reversed_positive")); + + Schema avroSchema4 = it.next(); + assertThat(avroSchema4.getName(), equalTo("PGSequence4")); + assertThat(avroSchema4.getProp(SPANNER_SEQUENCE_KIND), equalTo("default")); } @Test diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ImportFromAvroTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ImportFromAvroTest.java index 1064dd846c..adcadd320a 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ImportFromAvroTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ImportFromAvroTest.java @@ -1040,6 +1040,61 @@ public void pgGeneratedColumns() throws Exception { Dialect.POSTGRESQL); } + @Test + public void identityColumns() throws Exception { + SchemaBuilder.RecordBuilder record = SchemaBuilder.record("identityColumns"); + SchemaBuilder.FieldAssembler fieldAssembler = record.fields(); + + fieldAssembler + // Primary key. + .requiredLong("id") + // Integer columns. + .optionalLong("optional_identity") + .optionalLong("value"); + Schema schema = fieldAssembler.endRecord(); + String spannerSchema = + "CREATE TABLE `AvroTable` (" + + "`id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (" + + " SKIP RANGE 2000, 3000 START COUNTER WITH 1000)," + + "`optional_identity` INT64 GENERATED BY DEFAULT AS IDENTITY (" + + " BIT_REVERSED_POSITIVE SKIP RANGE 2000, 3000 START COUNTER WITH 1000)," + + "`value` INT64," + + ") PRIMARY KEY (`id`)"; + + runTest( + schema, + spannerSchema, + Arrays.asList(new GenericRecordBuilder(schema).set("value", 1L).build())); + } + + @Test + public void pgIdentityColumns() throws Exception { + SchemaBuilder.RecordBuilder record = SchemaBuilder.record("identityColumns"); + SchemaBuilder.FieldAssembler fieldAssembler = record.fields(); + + fieldAssembler + // Primary key. + .requiredLong("id") + // Integer columns. + .optionalLong("optional_identity") + .optionalLong("value"); + Schema schema = fieldAssembler.endRecord(); + String spannerSchema = + "CREATE TABLE \"AvroTable\" (" + + "\"id\" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY (" + + " SKIP RANGE 2000 3000 START COUNTER WITH 1000)," + + "\"optional_identity\" bigint GENERATED BY DEFAULT AS IDENTITY (" + + " BIT_REVERSED_POSITIVE SKIP RANGE 2000 3000 START COUNTER WITH 1000)," + + "\"value\" bigint," + + "PRIMARY KEY (\"id\")" + + ")"; + + runTest( + schema, + spannerSchema, + Arrays.asList(new GenericRecordBuilder(schema).set("value", 1L).build())); + } + @Test public void defaultColumns() throws Exception { SchemaBuilder.RecordBuilder record = SchemaBuilder.record("defaultColumns"); @@ -1648,6 +1703,13 @@ public void sequences() throws Exception { "CREATE SEQUENCE `Sequence2`" + " OPTIONS (sequence_kind=\"bit_reversed_positive\", " + " skip_range_min=0, skip_range_max=1000, start_with_counter=50)"; + String sequence2Def = + "CREATE SEQUENCE `Sequence3`" + + " OPTIONS (skip_range_min=0, skip_range_max=1000, start_with_counter=50)"; + String sequence3Def = "CREATE SEQUENCE `Sequence4` SKIP RANGE 0, 1000 START COUNTER WITH 50"; + String sequence4Def = + "CREATE SEQUENCE `Sequence5` BIT_REVERSED_POSITIVE " + + "SKIP RANGE 0, 1000 START COUNTER WITH 50"; String tableDef = "CREATE TABLE `T` (" + "`id` INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE Sequence2))," @@ -1655,7 +1717,8 @@ public void sequences() throws Exception { + "`c2` INT64," + ") PRIMARY KEY (`id`)"; - SPANNER_SERVER.createDatabase(dbName, Arrays.asList(sequenceDef, tableDef)); + SPANNER_SERVER.createDatabase( + dbName, Arrays.asList(sequenceDef, sequence2Def, sequence3Def, sequence4Def, tableDef)); // Run the import pipeline. importPipeline.apply( @@ -1689,6 +1752,13 @@ public void sequences() throws Exception { + " skip_range_max=1000," + " skip_range_min=0," + " start_with_counter=50)" + + "\nCREATE SEQUENCE `Sequence3`\n\tOPTIONS " + + "(skip_range_max=1000," + + " skip_range_min=0," + + " start_with_counter=50)" + + "\nCREATE SEQUENCE `Sequence4` SKIP RANGE 0, 1000 START COUNTER WITH 50" + + "\nCREATE SEQUENCE `Sequence5` BIT_REVERSED_POSITIVE" + + " SKIP RANGE 0, 1000 START COUNTER WITH 50" + "CREATE TABLE `T` (\n\t" + "`id` INT64 NOT NULL " + "DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE Sequence2)),\n\t" @@ -1740,13 +1810,15 @@ public void pgSequences() throws Exception { String sequenceDef = "CREATE SEQUENCE \"PGSequence2\" BIT_REVERSED_POSITIVE" + " SKIP RANGE 0 1000 START COUNTER WITH 50"; + String sequence2Def = + "CREATE SEQUENCE \"PGSequence3\"" + " SKIP RANGE 0 1000 START COUNTER WITH 50"; String tableDef = "CREATE TABLE \"T\" (" + "\"id\" bigint NOT NULL DEFAULT nextval('\"PGSequence2\"')," + "\"c\" bigint," + "PRIMARY KEY (\"id\"))"; - SPANNER_SERVER.createPgDatabase(dbName, Arrays.asList(sequenceDef, tableDef)); + SPANNER_SERVER.createPgDatabase(dbName, Arrays.asList(sequenceDef, sequence2Def, tableDef)); // Run the import pipeline. importPipeline.apply( @@ -1774,6 +1846,8 @@ public void pgSequences() throws Exception { + " SKIP RANGE 10 10000 START COUNTER WITH 99" + "\nCREATE SEQUENCE \"PGSequence2\" BIT_REVERSED_POSITIVE" + " SKIP RANGE 0 1000 START COUNTER WITH 50" + + "\nCREATE SEQUENCE \"PGSequence3\"" + + " SKIP RANGE 0 1000 START COUNTER WITH 50" + "CREATE TABLE \"T\" (" + "\n\t\"id\" bigint NOT NULL" + " DEFAULT nextval('\"PGSequence2\"'::text),\n\t" @@ -2111,6 +2185,14 @@ private void runTest( .setOptionName("version_retention_period") .setOptionValue(dialect == Dialect.GOOGLE_STANDARD_SQL ? "\"4d\"" : "'4d'") .build()) + .addDatabaseOptions( + ExportProtos.Export.DatabaseOption.newBuilder() + .setOptionName("default_sequence_kind") + .setOptionValue( + dialect == Dialect.GOOGLE_STANDARD_SQL + ? "\"bit_reversed_positive\"" + : "'bit_reversed_positive'") + .build()) .setDialect(ProtoDialect.valueOf(dialect.name())) .build(); JsonFormat.printer().print(exportProto); diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java index 78de2f8b2d..fc46dd17c3 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/DdlTest.java @@ -105,6 +105,21 @@ public void simple() { .max() .isHidden(true) .endColumn() + .column("identity_column") + .type(Type.int64()) + .isIdentityColumn(true) + .sequenceKind("bit_reversed_positive") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("identity_column_no_kind") + .type(Type.int64()) + .isIdentityColumn(true) + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() .primaryKey() .asc("id") .asc("gen_id") @@ -133,6 +148,12 @@ public void simple() { .setOptionValue("4d") .setOptionType("STRING") .build()) + .addDatabaseOptions( + Export.DatabaseOption.newBuilder() + .setOptionName("default_sequence_kind") + .setOptionValue("bit_reversed_positive") + .setOptionType("STRING") + .build()) .build(); builder.mergeDatabaseOptions(export.getDatabaseOptionsList()); Ddl ddl = builder.build(); @@ -140,6 +161,7 @@ public void simple() { ddl.prettyPrint(), equalToCompressingWhiteSpace( "ALTER DATABASE `%db_name%` SET OPTIONS ( version_retention_period = \"4d\" )" + + " ALTER DATABASE `%db_name%` SET OPTIONS ( default_sequence_kind = \"bit_reversed_positive\" )" + " CREATE TABLE `Users` (" + " `id` INT64 NOT NULL," + " `gen_id` INT64 NOT NULL AS (MOD(id+1, 64)) STORED," @@ -147,6 +169,10 @@ public void simple() { + " `last_name` STRING(MAX)," + " `full_name` STRING(MAX) AS (CONCAT(first_name, ' ', last_name)) STORED," + " `HiddenColumn` STRING(MAX) HIDDEN," + + " `identity_column` INT64 GENERATED BY DEFAULT AS IDENTITY (" + + "BIT_REVERSED_POSITIVE SKIP RANGE 2000, 3000 START COUNTER WITH 1000)," + + " `identity_column_no_kind` INT64 GENERATED BY DEFAULT AS IDENTITY (" + + "SKIP RANGE 2000, 3000 START COUNTER WITH 1000)," + " CONSTRAINT `ck` CHECK (`first_name` != `last_name`)," + " ) PRIMARY KEY (`id` ASC, `gen_id` ASC)" + " CREATE INDEX `UsersByFirstName` ON `Users` (`first_name`)" @@ -162,7 +188,7 @@ public void simple() { + " FOREIGN KEY (`last_name`) REFERENCES " + "`AllowedNames` (`last_name`) ENFORCED")); List statements = ddl.statements(); - assertEquals(8, statements.size()); + assertEquals(9, statements.size()); assertThat( statements.get(0), equalToCompressingWhiteSpace( @@ -173,6 +199,10 @@ public void simple() { + " `last_name` STRING(MAX)," + " `full_name` STRING(MAX) AS (CONCAT(first_name, ' ', last_name)) STORED," + " `HiddenColumn` STRING(MAX) HIDDEN," + + " `identity_column` INT64 GENERATED BY DEFAULT AS IDENTITY (" + + "BIT_REVERSED_POSITIVE SKIP RANGE 2000, 3000 START COUNTER WITH 1000)," + + " `identity_column_no_kind` INT64 GENERATED BY DEFAULT AS IDENTITY (" + + "SKIP RANGE 2000, 3000 START COUNTER WITH 1000)," + " CONSTRAINT `ck` CHECK (`first_name` != `last_name`)," + " ) PRIMARY KEY (`id` ASC, `gen_id` ASC)")); assertThat( @@ -209,6 +239,10 @@ public void simple() { statements.get(7), equalToCompressingWhiteSpace( "ALTER DATABASE `%db_name%` SET OPTIONS ( version_retention_period = \"4d\" )")); + assertThat( + statements.get(8), + equalToCompressingWhiteSpace( + "ALTER DATABASE `%db_name%` SET OPTIONS ( default_sequence_kind = \"bit_reversed_positive\" )")); assertNotNull(ddl.hashCode()); } @@ -243,6 +277,21 @@ public void pgSimple() { .generatedAs("CONCAT(first_name, ' ', last_name)") .stored() .endColumn() + .column("identity_column") + .pgInt8() + .isIdentityColumn(true) + .sequenceKind("bit_reversed_positive") + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() + .column("identity_column_no_kind") + .pgInt8() + .isIdentityColumn(true) + .counterStartValue(1000L) + .skipRangeMin(2000L) + .skipRangeMax(3000L) + .endColumn() .column("update_time") .pgSpannerCommitTimestamp() .notNull() @@ -270,6 +319,12 @@ public void pgSimple() { .setOptionValue("4d") .setOptionType("character varying") .build()) + .addDatabaseOptions( + Export.DatabaseOption.newBuilder() + .setOptionName("default_sequence_kind") + .setOptionValue("bit_reversed_positive") + .setOptionType("character varying") + .build()) .build(); builder.mergeDatabaseOptions(export.getDatabaseOptionsList()); Ddl ddl = builder.build(); @@ -277,6 +332,7 @@ public void pgSimple() { ddl.prettyPrint(), equalToCompressingWhiteSpace( "ALTER DATABASE \"%db_name%\" SET spanner.version_retention_period = '4d'" + + " ALTER DATABASE \"%db_name%\" SET spanner.default_sequence_kind = 'bit_reversed_positive'" + " CREATE TABLE \"Users\" (" + " \"id\" bigint NOT NULL," + " \"gen_id\" bigint NOT NULL GENERATED ALWAYS AS (MOD(id+1, 64)) STORED," @@ -284,6 +340,10 @@ public void pgSimple() { + " \"last_name\" character varying DEFAULT Lennon," + " \"full_name\" character varying GENERATED ALWAYS AS" + " (CONCAT(first_name, ' ', last_name)) STORED," + + " \"identity_column\" bigint GENERATED BY DEFAULT AS IDENTITY (" + + "BIT_REVERSED_POSITIVE SKIP RANGE 2000 3000 START COUNTER WITH 1000)," + + " \"identity_column_no_kind\" bigint GENERATED BY DEFAULT AS IDENTITY (" + + "SKIP RANGE 2000 3000 START COUNTER WITH 1000)," + " \"update_time\" spanner.commit_timestamp NOT NULL," + " CONSTRAINT \"ck\" CHECK (\"first_name\" != \"last_name\")," + " PRIMARY KEY (\"id\", \"gen_id\")" @@ -1047,22 +1107,34 @@ public void sequences() { "skip_range_max=1000", "start_with_counter=50")) .endSequence() + .createSequence("MySequence2") + .options( + ImmutableList.of( + "skip_range_min=0", "skip_range_max=1000", "start_with_counter=50")) + .endSequence() .build(); assertThat( ddl.prettyPrint(), equalToCompressingWhiteSpace( "CREATE SEQUENCE `MySequence`" + " OPTIONS (sequence_kind=\"bit_reversed_positive\", " - + " skip_range_min=0, skip_range_max=1000, start_with_counter=50)")); + + " skip_range_min=0, skip_range_max=1000, start_with_counter=50)" + + " CREATE SEQUENCE `MySequence2`" + + " OPTIONS (skip_range_min=0, skip_range_max=1000, start_with_counter=50)")); List statements = ddl.statements(); - assertEquals(1, statements.size()); + assertEquals(2, statements.size()); assertThat( statements.get(0), equalToCompressingWhiteSpace( "CREATE SEQUENCE `MySequence`" + " OPTIONS (sequence_kind=\"bit_reversed_positive\", " + " skip_range_min=0, skip_range_max=1000, start_with_counter=50)")); + assertThat( + statements.get(1), + equalToCompressingWhiteSpace( + "CREATE SEQUENCE `MySequence2`" + + " OPTIONS (skip_range_min=0, skip_range_max=1000, start_with_counter=50)")); assertNotNull(ddl.hashCode()); } @@ -1079,16 +1151,22 @@ public void pgSequences() { .createSequence("MyPGSequence2") .sequenceKind("bit_reversed_positive") .endSequence() + .createSequence("MyPGSequence3") + .counterStartValue(Long.valueOf(30)) + .skipRangeMin(Long.valueOf(1)) + .skipRangeMax(Long.valueOf(1000)) + .endSequence() .build(); assertThat( ddl.prettyPrint(), equalToCompressingWhiteSpace( "\nCREATE SEQUENCE \"MyPGSequence\" BIT_REVERSED_POSITIVE" + " SKIP RANGE 1 1000 START COUNTER WITH 30 " - + "\nCREATE SEQUENCE \"MyPGSequence2\" BIT_REVERSED_POSITIVE")); + + "\nCREATE SEQUENCE \"MyPGSequence2\" BIT_REVERSED_POSITIVE" + + "\nCREATE SEQUENCE \"MyPGSequence3\" SKIP RANGE 1 1000 START COUNTER WITH 30")); List statements = ddl.statements(); - assertEquals(2, statements.size()); + assertEquals(3, statements.size()); assertThat( statements.get(0), equalToCompressingWhiteSpace( @@ -1097,6 +1175,10 @@ public void pgSequences() { assertThat( statements.get(1), equalToCompressingWhiteSpace("CREATE SEQUENCE \"MyPGSequence2\" BIT_REVERSED_POSITIVE")); + assertThat( + statements.get(2), + equalToCompressingWhiteSpace( + "CREATE SEQUENCE \"MyPGSequence3\" SKIP RANGE 1 1000 START COUNTER WITH 30")); assertNotNull(ddl.hashCode()); } diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java index 380e5abf37..03e4aaa894 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java @@ -972,6 +972,40 @@ public void pgDefaultColumns() throws Exception { assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(statement)); } + @Test + public void identityColumns() throws Exception { + List statements = + Arrays.asList( + "ALTER DATABASE `" + + dbId + + "` SET OPTIONS ( default_sequence_kind = \"bit_reversed_positive\" )", + "CREATE TABLE `T` (" + + " `id` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY," + + " `non_key_col` INT64 NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE)," + + " ) PRIMARY KEY (`id` ASC)"); + + SPANNER_SERVER.createDatabase(dbId, statements); + Ddl ddl = getDatabaseDdl(); + assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(String.join("", statements))); + } + + @Test + public void pgIdentityColumns() throws Exception { + List statements = + Arrays.asList( + "ALTER DATABASE \"" + + dbId + + "\" SET spanner.default_sequence_kind = 'bit_reversed_positive'", + "CREATE TABLE \"T\" (" + + " \"id\" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY," + + " \"non_key_col\" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE)," + + " PRIMARY KEY (\"id\") )"); + + SPANNER_SERVER.createPgDatabase(dbId, statements); + Ddl ddl = getPgDatabaseDdl(); + assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(String.join("", statements))); + } + @Test public void databaseOptions() throws Exception { List statements = @@ -1080,12 +1114,21 @@ public void pgChangeStreams() throws Exception { public void sequences() throws Exception { List statements = Arrays.asList( + "ALTER DATABASE `" + + dbId + + "` SET OPTIONS ( default_sequence_kind = \"bit_reversed_positive\" )", "CREATE SEQUENCE `MySequence` OPTIONS (" + "sequence_kind = \"bit_reversed_positive\")", "CREATE SEQUENCE `MySequence2` OPTIONS (" + "sequence_kind = \"bit_reversed_positive\"," + "skip_range_min = 1," + "skip_range_max = 1000," + "start_with_counter = 100)", + "CREATE SEQUENCE `MySequence3` OPTIONS (" + + "skip_range_min = 1," + + "skip_range_max = 1000," + + "start_with_counter = 100)", + "CREATE SEQUENCE `MySequence4`", + "CREATE SEQUENCE `MySequence5` BIT_REVERSED_POSITIVE SKIP RANGE 1, 1000 START COUNTER WITH 100", "CREATE TABLE `Account` (" + " `id` INT64 DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE MySequence))," + " `balanceId` INT64 NOT NULL," @@ -1094,13 +1137,22 @@ public void sequences() throws Exception { SPANNER_SERVER.createDatabase(dbId, statements); Ddl ddl = getDatabaseDdl(); String expectedDdl = - "\nCREATE SEQUENCE `MySequence`\n\tOPTIONS " + "ALTER DATABASE `" + + dbId + + "` SET OPTIONS ( default_sequence_kind = \"bit_reversed_positive\" )" + + "\nCREATE SEQUENCE `MySequence`\n\tOPTIONS " + "(sequence_kind=\"bit_reversed_positive\")\n" - + "CREATE SEQUENCE `MySequence2`\n\tOPTIONS " + + "\nCREATE SEQUENCE `MySequence2`\n\tOPTIONS " + "(sequence_kind=\"bit_reversed_positive\"," + " skip_range_max=1000," + " skip_range_min=1," + " start_with_counter=100)" + + "\nCREATE SEQUENCE `MySequence3`\n\tOPTIONS " + + "(skip_range_max=1000," + + " skip_range_min=1," + + " start_with_counter=100)" + + "\nCREATE SEQUENCE `MySequence4`" + + "\nCREATE SEQUENCE `MySequence5` BIT_REVERSED_POSITIVE SKIP RANGE 1, 1000 START COUNTER WITH 100" + "CREATE TABLE `Account` (" + "\n\t`id` INT64 DEFAULT" + " (GET_NEXT_SEQUENCE_VALUE(SEQUENCE MySequence))," @@ -1113,9 +1165,13 @@ public void sequences() throws Exception { public void pgSequences() throws Exception { List statements = Arrays.asList( + "ALTER DATABASE \"" + + dbId + + "\" SET spanner.default_sequence_kind = 'bit_reversed_positive'", "CREATE SEQUENCE \"MyPGSequence\" BIT_REVERSED_POSITIVE", "CREATE SEQUENCE \"MyPGSequence2\" BIT_REVERSED_POSITIVE" + " SKIP RANGE 1 1000 START COUNTER WITH 100", + "CREATE SEQUENCE \"MyPGSequence3\"" + " SKIP RANGE 1 1000 START COUNTER WITH 100", "CREATE TABLE \"Account\" (" + " \"id\" bigint DEFAULT nextval('\"MyPGSequence\"')," + " \"balanceId\" bigint NOT NULL," @@ -1124,10 +1180,15 @@ public void pgSequences() throws Exception { SPANNER_SERVER.createPgDatabase(dbId, statements); Ddl ddl = getPgDatabaseDdl(); String expectedDdl = - "\nCREATE SEQUENCE \"MyPGSequence\" BIT_REVERSED_POSITIVE" + "ALTER DATABASE \"" + + dbId + + "\" SET spanner.default_sequence_kind = 'bit_reversed_positive'" + + "\nCREATE SEQUENCE \"MyPGSequence\" BIT_REVERSED_POSITIVE" + " START COUNTER WITH 1" + "\nCREATE SEQUENCE \"MyPGSequence2\" BIT_REVERSED_POSITIVE" + " SKIP RANGE 1 1000 START COUNTER WITH 100" + + "\nCREATE SEQUENCE \"MyPGSequence3\"" + + " SKIP RANGE 1 1000 START COUNTER WITH 100" + "CREATE TABLE \"Account\" (" + "\n\t\"id\" bigint NOT NULL" + " DEFAULT nextval('\"MyPGSequence\"'::text),\n\t" diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java index aa78f7af83..2adef05947 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java @@ -53,7 +53,9 @@ public void testListColumnsSQL() { + " FROM information_schema.constraint_column_usage AS c" + " WHERE c.constraint_name = CONCAT('PLACEMENT_KEY_', c.table_name))" + " SELECT c.table_schema, c.table_name, c.column_name, c.ordinal_position, c.spanner_type, c.is_nullable," - + " c.is_generated, c.generation_expression, c.is_stored, c.column_default, c.is_hidden," + + " c.is_generated, c.generation_expression, c.is_stored," + + " c.column_default, c.is_identity, c.identity_kind, c.identity_start_with_counter," + + " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden," + " pkc.constraint_name IS NOT NULL AS is_placement_key" + " FROM information_schema.columns as c" + " LEFT JOIN placementkeycolumns AS pkc" @@ -70,6 +72,8 @@ public void testListColumnsSQL() { + " WHERE c.constraint_name = CONCAT('PLACEMENT_KEY_', c.table_name))" + " SELECT c.table_schema, c.table_name, c.column_name, c.ordinal_position, c.spanner_type, c.is_nullable," + " c.is_generated, c.generation_expression, c.is_stored, c.column_default," + + " c.is_identity, c.identity_kind, c.identity_start_with_counter," + + " c.identity_skip_range_min, c.identity_skip_range_max," + " pkc.constraint_name IS NOT NULL AS is_placement_key" + " FROM information_schema.columns as c" + " LEFT JOIN placementkeycolumns AS pkc" diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomInsertMutationGenerator.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomInsertMutationGenerator.java index b49ee692fc..87d75cf591 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomInsertMutationGenerator.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomInsertMutationGenerator.java @@ -155,7 +155,7 @@ public TableSupplier(Table table) { } } for (Column column : table.columns()) { - if (!column.isGenerated()) { + if (!column.isGenerated() && !column.isIdentityColumn()) { valueGenerators.put( column.name(), randomValueGenerator