Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1772187 nultiple columns evolution bug #980

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@
<artifactId>snowflake-jdbc</artifactId>
</exclusion>
</exclusions>
<scope>system</scope>
<systemPath>/Users/bzabek/snowflake-kafka-connector/snowflake-ingest-sdk.jar</systemPath>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -35,7 +34,7 @@ protected void createIcebergTable() {

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
@Disabled
// @Disabled
void shouldEvolveSchemaAndInsertRecords(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
Expand Down Expand Up @@ -81,6 +80,59 @@ void shouldEvolveSchemaAndInsertRecords(
assertRecordsInTable();
}

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
// @Disabled
void shouldEvolveSchemaAndInsertRecords_withObjects(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
// start off with just one column
List<DescribeTableRow> rows = describeTable(tableName);
assertThat(rows)
.hasSize(1)
.extracting(DescribeTableRow::getColumn)
.contains(Utils.TABLE_COLUMN_METADATA);

SinkRecord record = createKafkaRecord(message, 0, withSchema);
service.insert(Collections.singletonList(record));
waitForOffset(-1);
rows = describeTable(tableName);
assertThat(rows.size()).isEqualTo(9);

// don't check metadata column schema, we have different tests for that
rows =
rows.stream()
.filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA))
.collect(Collectors.toList());

assertThat(rows).containsExactlyInAnyOrder(expectedSchema);

// resend and store same record without any issues now
service.insert(Collections.singletonList(record));
waitForOffset(1);

// and another record with same schema
service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This insertion works fine. It adds 9 new columns.

waitForOffset(2);

String mapRecordJson =
"{"
+ " \"type\": \"map\", "
+ " \"key-id\": 4, "
+ " \"key\": \"string\", "
+ " \"value-id\": 5, "
+ " \"value-required\": false, "
+ " \"value\": \"double\" "
+ "}";

// reinsert record with extra field
service.insert(Collections.singletonList(createKafkaRecord(mapRecordJson, 2, false)));
rows = describeTable(tableName);
assertThat(rows).hasSize(15);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of columns is correct. Schema evolution logic altered the table.

service.insert(Collections.singletonList(createKafkaRecord(mapRecordJson, 2, false)));
waitForOffset(3); // FAIL
Copy link
Contributor Author

@sfc-gh-bzabek sfc-gh-bzabek Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However here it fails to ingest data. With error described in Jira

}

private void assertRecordsInTable() {
List<RecordWithMetadata<PrimitiveJsonRecord>> recordsWithMetadata =
selectAllSchematizedRecords();
Expand Down
Loading